Coder Social home page Coder Social logo

blog's Introduction

Blog

  • 个人技术博客,源码分析会放在 SourceCodeAnalysis 相应目录,一些文章会出现在 Issues 中。
  • 内容主要为后端技术,目前使用语言是 Go。
  • 关键词:Go | Linux | Docker | Kubernetes

blog's People

Contributors

llleon avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

blog's Issues

使用 Prometheus 监控 MongoDB

0. 安装 Prometheus 和 Grafana 等组件

通过 Minikube 来安装 Kubernetes 测试环境。

$ minikube start --cpus 4 --memory 8192 --vm-driver hyperkit
$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
$ helm repo add stable https://kubernetes-charts.storage.googleapis.com/
$ helm repo update
$ helm install prometheus prometheus-community/kube-prometheus-stack

可以使用 kubectl 查看安装好的各组件。

1. 安装 MongoDB

mongodb.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: mongodb-deployment
  labels:
    app: mongodb
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mongodb
  template:
    metadata:
      labels:
        app: mongodb
    spec:
      containers:
      - name: mongodb
        image: mongo
        ports:
        - containerPort: 27017
---
apiVersion: v1
kind: Service
metadata:
  name: mongodb-service
spec:
  selector:
    app: mongodb
  ports:
    - protocol: TCP
      port: 27017
      targetPort: 27017

安装:

$ kubectl apply -f mongodb.yaml

2. 安装 Exporter

Exporter 是什么:将要采集数据的应用数据转换成 Prometheus 能理解的 metrics。

工作流程:1)Exporter 从目标应用获取 metrics 数据。2)转换为正确格式。3)在 /metrics 端点暴露 metrics。4)Promettheus 的 Retrieval 拉取 metrics 数据。5)Promettheus 将 metrics 数据存储到时序数据库。6)Promettheus 的 HTTP Server 提供从数据库获取数据的 API,可以通过 PromQL 来查询数据。

需要安装 3 个组件:

  • Exporter 应用:用来暴露 /metrics 端点
  • Service:用来连接到 Exporter
  • ServiceMonitor:让 Prometheus 知道有一个新的端点需要被 scrape。

可以通过 Helm 来安装。

先添加 repo:

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
$ helm repo update

查看 chart 的 values:

$ helm show values prometheus-community/prometheus-mongodb-exporter

有几个值需要指定:

values.yaml

mongodb:
  uri: "mongodb://mongodb-service:27017" # 指定 MongoDB SVC endpoint

serviceMonitor:
  additionalLabels:
    release: prometheus # ServiceMonitor 标签,使其可以被 Prometheus 发现

安装:

helm install mongodb-exporter prometheus-community/prometheus-mongodb-exporter -f values.yaml

检查 SVC 及 ServiceMonitor 是否正常:

$ kubectl port-forward service/mongodb-exporter-prometheus-mongodb-exporter 9216
$ curl localhost:9216
$ kubectl get servicemonitor

3. 使用 Grafana 查看数据

$ kubectl port-forward deployment/prometheus-grafana 3000

用户名 admin,获取密码:

$ kubectl get secret prometheus-grafana -o jsonpath="{.data.admin-password}" | base64 --decode ; echo

浏览器访问 http://localhost:3000/,登录后即可在 Manage 中查看 MongoDB 的 CPU、内存等的使用情况了。

X86-32 平台操作系统的加载

这段时间学操作系统,好奇计算机是怎么从通电到成功加载操作系统的,看了一些文章顺便做下总结。

第 0、1 小节介绍了一些地址和寄存器的基本概念,后面介绍了 80386 从通电后,怎么把操作系统加载到内存中来运行的过程。

0. 几个地址的概念

先来理解这几个地址的概念:物理地址、虚拟地址(线性地址)、逻辑地址。

任何时候,计算机上都存在一个程序能够产生的地址集合,我们称之为地址范围。这个范围的大小由 CPU 的位数决定,例如一个 32 位的 CPU,它的地址范围是 0~0xFFFFFFFF(4G), 而对于一个 64 位的 CPU,它的地址范围为 0~0xFFFFFFFFFFFFFFFF(64T)。 这个范围就是程序能够产生的地址范围,我们把这个地址范围称为虚拟地址空间,该空间中的某一个地址我们称之为虚拟地址。与虚拟地址空间和虚拟地址相对应的则是物理地址空间和物理地址,大多数时候我们的系统所具备的物理地址空间只是虚拟地址空间的一个子集。

举一个最简单的例子直观地说明这两者:对于一台内存为 256M 的 32bit X86 主机来说,它的虚拟地址空间范围是 0~0xFFFFFFFF(4G), 而物理地址空间范围是 0x000000000~0x0FFFFFFF(256M)。

这里有一个虚拟内存的概念,虚拟内存(virtual memory)是对整个内存(不要和机器上插那条对上号)的抽像描述。它是相对于物理内存来讲的,能直接理解成 “不直实的”,“假的” 内存。现代操作系统都提供了一种内存管理的抽像,即虚拟内存(virtual memory)。进程使用虚拟内存中的地址,由操作系统协助相关硬件,把它 “转换” 成真正的物理地址。这个“转换”,是所有问题讨论的关键。
有了这样的抽像,一个程序就能使用比真实物理地址大得多的地址空间(拆东墙,补西墙,银行也是这样子做的),甚至多个进程能使用相同的地址。这不奇怪,因为转换后的物理地址并非相同的。

  • 逻辑地址(Logical Address):是指由程序产生的与段相关的偏移地址部分。是在有地址变换功能的计算机中,访内指令给出的地址(操作数)叫逻辑地址, 也叫相对地址,也就是是机器语言指令中,用来指定一个操作数或是一条指令的地址。要经过寻址方式的计算或变换才得到内存储器中的实际有效地址即物理地址。

    一个逻辑地址由两部分组成,段标识符: 段内偏移量。段标识符是由一个 16 位长的字段组成,称为段选择子(Segment Selector),其中前 13 位是个索引号,后面 3 位包含其它信息 (后面有介绍)。而偏移量是一个 32 位长的字段。

  • 线性地址(Linear Address):也叫虚拟地址(Virtual Address)。是逻辑地址到物理地址变换之间的中间层。在分段部件中,逻辑地址是段中的偏移地址,然后加上基地址就是线性地址。

    线性地址是一个 32 位无符号整数,可以用来表示高达 4GB 的地址,也就是,高达 4294967296 个内存单元。线性地址通常用十六进制数字表示,值的范围从 0x00000000 ~ 0xffffffff 。程序代码会产生逻辑地址,通过逻辑地址变换就可以生成一个线性地址。

    如果启用了分页机制,那么线性地址可以再经过变换以产生一个物理地址。如果没有启用分页机制,那么线性地址就是物理地址。

  • 物理地址(Physical Address):CPU 地址总线传来的地址,由硬件电路控制(现在这些硬件是可编程的了)其具体含义。物理地址中很大一部分是留给内存条中的内存的,但也常被映射到其他存储器上(如显存、BIOS 等)。

    在没有使用虚拟存储器的机器上,虚拟地址被直接送到内存总线上,使具有相同地址的物理存储器被读写;而在使用了虚拟存储器的情况下,虚拟地址不是被直接送到内存地址总线上,而是送到存储器管理单元 MMU,把虚拟地址映射为物理地址。

CPU 将一个逻辑地址转换为物理地址,需要进行两步:

  1. 将给定的逻辑地址(其实是段内偏移量,这个一定要理解!),CPU 内存管理单元(MMU)利用其段式内存管理单元(segmentation unit),先将逻辑地址转换成一个线性地址。
  2. 利用其页式内存管理单元(paging unit),把线性地址转换为物理地址

这样做两次转换,的确是非常麻烦而且没有必要的,因为直接可以把线性地址抽象给进程。之所以这样冗余,Intel 完全是为了兼容而已(Intel 为了兼容,将远古时代的段式内存管理方式保留了下来,x86 体系的处理器刚开始时只有 20 根地址线,寻址寄存器是 16 位。我们知道 16 位的寄存器可以访问 64K 的地址空间,如果程序要想访问大于 64K 的内存,就需要把内存分段,每段 64K,用段地址+偏移量的方式来访问,这样使 20 根地址线全用上,最大的寻址空间就可以到 1M 字节,这在当时已经是非常大的内存空间了。

现代的多用户多进程操作系统,需要 MMU 才能达到每个用户进程都拥有自己独立的地址空间的目标。使用 MMU,操作系统划分出一段地址区域,在这块地址区域中, 每个进程看到的内容都不一定一样。例如 Windows 操作系统将地址范围 4M-2G 划分为用户地址空间,进程 A 在地址 0X400000(4M)映射了可执行文件,进程 B 同样在地址 0X400000(4M)映射了可执行文件,如果 A 进程读地址 0X400000,读到的是 A 的可执行文件映射到 RAM 的内容,而进程 B 读取地址 0X400000 时,读到的则是 B 的可执行文件映射到 RAM 的内容。这就是 MMU 在当中进行地址转换所起的作用。

1. X86 寄存器说明

32 位 CPU 所含有的寄存器有:

  • 4 个数据寄存器:EAX、EBX、ECX 和 EDX。
  • 2 个变址和指针寄存器:ESI 和 EDI。
  • 2 个指针寄存器:ESP 和 EBP。
  • 6 个段寄存器:ES、CS、SS、DS、FS 和 GS。
  • 1 个指令指针寄存器:EIP。
  • 1 个标志寄存器:EFlags。

1.1 数据寄存器

数据寄存器主要用来保存操作数和运算结果等信息,从而节省读取操作数所需占用总线和访问存储器的时间。

32 位 CPU 有 4 个 32 位的通用寄存器 EAX、EBX、ECX 和 EDX。对低 16 位数据的存取,不会影响高 16 位的数据。这些低 16 位寄存器分别命名为:AX、BX、CX 和 DX,它和先前的 16 位 CPU 中的寄存器相一致。

4 个 16 位寄存器又可分割成 8 个独立的 8 位寄存器(AX:AH-AL、BX:BH-BL、CX:CH-CL、DX:DH-DL),每个寄存器都有自己的名称,可独立存取。程序员可利用数据寄存器的这种“可分可合”的特性,灵活地处理字/字节的信息。

  • 寄存器 AX 和 AL 通常称为累加器(Accumulator),用累加器进行的操作可能需要更少时间。累加器可用于乘、除、输入/输出等操作,它们的使用频率很高。
  • 寄存器 BX 称为基地址寄存器(BaseRegister)。它可作为存储器指针来使用。
  • 寄存器 CX 称为计数寄存器(CountRegister)。在循环和字符串操作时,要用它来控制循环次数;在位操作中,当移多位时,要用 CL 来指明移位的位数。
  • 寄存器 DX 称为数据寄存器(DataRegister)。在进行乘、除运算时,它可作为默认的操作数参与运算,也可用于存放 I/O 的端口地址。

在 16 位 CPU 中,AX、BX、CX 和 DX 不能作为基址和变址寄存器来存放存储单元的地址,但在 32 位 CPU 中,其 32 位寄存器 EAX、EBX、ECX 和 EDX 不仅可传送数据、暂存数据保存算术逻辑运算结果,而且可作为指针寄存器,所以,这些 32 位寄存器更具有通用性。

1.2 变址寄存器

32 位 CPU 有 2 个 32 位通用寄存器 ESI 和 EDI。其低 16 位对应先前 16 位 CPU 中的 SI 和 DI,对低 16 位数据的存取,不影响高 16 位的数据。

寄存器 ESI、EDI、SI 和 DI 称为变址寄存器(IndexRegister),它们主要用于存放存储单元在段内的偏移量,用它们可实现多种存储器操作数的寻址方式,为以不同的地址形式访问存储单元提供方便。

变址寄存器不可分割成 8 位寄存器。作为通用寄存器,也可存储算术逻辑运算的操作数和运算结果

它们可作一般的存储器指针使用。在字符串操作指令的执行过程中,对它们有特定的要求,而且还具有特殊的功能。

1.3 指针寄存器

32 位 CPU 有 2 个 32 位通用寄存器 EBP 和 ESP。其低 16 位对应先前 16 位 CPU 中的 BP 和 SP,对低 16 位数据的存取,不影响高 16 位的数据。

寄存器 EBP、ESP、BP 和 SP 称为指针寄存器(PointerRegister),主要用于存放堆栈内存储单元的偏移量,用它们可实现多种存储器操作数的寻址方式,为以不同的地址形式访问存储单元提供方便。

指针寄存器不可分割成 8 位寄存器。作为通用寄存器,也可存储算术逻辑运算的操作数和运算结果

它们主要用于访问堆栈内的存储单元,并且规定:

  • BP 为基指针(BasePointer)寄存器,用它可直接存取堆栈中的数据
  • SP 为堆栈指针(StackPointer)寄存器,用它只可访问栈顶

1.4 段寄存器

在 32 位 CPU 中,有 6 个 16 位的段寄存器,所以,在此环境下开发的程序最多可同时访问 6 个段。

段寄存器是根据内存分段的管理模式而设置的。内存单元的物理地址是由段寄存器的值和一个偏移量组合而成
的,这样可用两个较少位数的值组合成一个可访问较大物理空间的内存地址。

CPU 内部的段寄存器为:

CS(Code Segment):代码段寄存器,其值为代码段的段值。

DS(Data Segment):数据段寄存器,其值为数据段的段值。

SS(Stack Segment):堆栈段寄存器,其值为堆栈段的段值。

ES(Extra Segment):附加段寄存器,其值为附加数据段的段值。

FS(Extra Segment):附加段寄存器,其值为附加数据段的段值。

GS(Extra Segment):附加段寄存器,其值为附加数据段的段值。

尽管只有 6 个段寄存器,但程序可以把同一个段寄存器用于不同地目的,这 6 个段寄存器中的 3 个有专门的用途:

  1. CS:代码段寄存器,指向包含程序指令的段。
  2. SS:栈段寄存器,指向包含当前程序栈的段。
  3. DS:数据段寄存器,指向包含静态数据或者全局数据段。

其它 3 个段寄存器用作一般用途,可以指向任意的数据段。

1.4.1 实模式与保护模式简述

32 位 CPU 有两个不同的工作模式:实模式和保护模式。实模式和保护模式都是 CPU 的工作模式,而 CPU 的工作模式是指 CPU 的寻址方式、寄存器大小等用来反应 CPU 在该环境下如何工作的概念。

在这两种模式下,段寄存器的作用是不同的。有关规定简单描述如下:

实模式: 前 4 个段寄存器 CS、DS、SS 和 ES 与先前 CPU 中的所对应的段寄存器的含义完全一致,内存单元的逻辑地址仍为段基址:段偏移量的形式。为访问某内存段内的数据,必须使用该段寄存器和存储单元的偏移量。

保护模式: 在此模式下,情况要复杂得多,装入段寄存器的不再是段值,而是被称为“段选择子”(Segment Selector)的某个值,下面 1.4.3 小节会介绍。

1.4.2 实模式原理

实模式出现于早期 8086 和 8088 CPU 时期。当时由于 CPU 的性能有限,一共只有 20 位地址线 A19 ~ A0,寻址 1MB 的存储空间,其物理地址范围为 00000H ~ FFFFFH(即 2^20 = 1048576 Byte,所以地址空间只有 1MB)。由于复位后首先从地址高端的 FFFFFH 开始执行指令,所以将地址高端设置位 ROM 空间,而低端作为 RAM 空间。

80386 的实模式是为了与 8086 处理器兼容而设置的,在实模式下,80386 处理器就相当于一个快速的 8086 处理器。80386 处理器被复位或加电的时候以实模式启动,这时候处理器中的各寄存器以实模式的初始值工作。

此时 80386 的 32 位地址线只使用了低 20 位,即可访问 1MB 的物理地址空间。在实模式下,80386 处理器不能对内存进行分页机制的管理,所以指令寻址的地址就是内存中实际的物理地址。在实模式下,所有的段都是可以读、写和执行的。实模式下 80386 不支持优先级,所有的指令相当于工作在特权级(即优先级 0),所以它可以执行所有特权指令,包括读写控制寄存器 CR0 等。这实际上使得在实模式下不太可能设计一个有保护能力的操作系统。

它有 8 个 16 位的通用寄存器,以及 4 个 16 位的段寄存器。所以为了能够通过这些 16 位的寄存器去构成 20 位的主存地址,必须采取一种特殊的方式。当某个指令想要访问某个内存地址时,它通常需要用下面的这种格式来表示:

段基址:段偏移量

其中第一个字段是段基址,它的值是由段寄存器提供的(一般来说,段寄存器有 6 种,上面有介绍)。

第二个字段是段内偏移量,代表要访问的这个内存地址距离这个段基址的偏移。它的值就是由通用寄存器来提供的,所以也是 16 位。那么两个 16 位的值如何组合成一个 20 位的地址呢?CPU 采用的方式是把段寄存器所提供的段基址先向左移4位。这样就变成了一个 20 位的值,然后再与段偏移量相加。即:

物理地址 = 段基址<<4 + 段内偏移

所以假设段寄存器中的值是 0xff00,段偏移量为 0x0110。则这个地址对应的真实物理地址是:0xff00<<4 + 0x0110 = 0xff110

由上面的介绍可见,实模式的“实”更多地体现在其地址是真实的物理地址。

1.4.3 保护模式原理

随着 CPU 的发展,CPU 地址线的个数也从原来的 20 根变为现在的 32 根,可以访问的内存空间也从 1MB 变为现在4GB,寄存器的位数也变为 32 位。所以实模式下的内存地址计算方式就已经不再适合了,因此就引入了现在的保护模式,实现更大空间的、更灵活也更安全的内存访问。

简单地说,通过保护模式,可以把虚拟地址空间映射到不同的物理地址空间,且在超出预设的空间范围会报错(一种保护机制的体现),且可以保证处于低特权级的代码无法访问高特权级的数据(另外一种保护机制的体现)。

在保护模式下,80386 的 32 条地址线全部有效,但是内存寻址方式还是得兼容老办法,即(段基址:段偏移量)的表示方式,这使得其:

  • 可寻址高达 4GB 的线性地址空间和物理地址空间。此时 CPU 中的通用寄存器都要换成 32 位寄存器(除了段寄存器)来保证寄存器能访问所有的 4GB 空间。
  • 可访问 64TB(有 2^14 个段,每个段最大空间为 2^32 字节)的虚拟地址空间。
  • 可采用分段存储管理机制和分页存储管理机制。

保护模式下的偏移值和实模式下是一样的,就是变成了 32 位而已。段值仍旧是存放在原来 16 位的段寄存器中,但是这些段寄存器存放的却不再是段基址了。之前说过实模式下寻址方式不安全,在保护模式下需要加一些限制,而这些限制不是一个寄存器能够容纳的,于是把这些关于内存段的限制信息放在一个叫做全局描述符表(GDT:Global Descriptor Table)的结构里。全局描述符表中含有一个个表项,每一个表项称为段描述符。在保护模式下,段寄存器存放的便是相当于一个数组索引的东西,即段选择子(Segment Selector),通过这个索引,可以找到对应的表项。

在保护模式下,每个内存段就是一个段描述符。段描述符存放了段基址(Base)、段界限(Limit)、内存段类型属性(比如是数据段还是代码段,注意一个段描述符只能用来定义一个内存段)等许多属性,具体信息见下图:

segment

其中,段界限表示段边界的扩张最值,即最大扩展多少或最小扩展多少,用 20 位来表示,它的单位可以是字节,也可以是 4KB,这是由 G 位决定的(G 为 1 时表示单位为 4KB)。

实际段界限边界值 =(描述符中的段界限+1)*(段界限的单位大小(即字节或 4KB))-1,如果偏移地址超过了段界限,CPU 会抛出异常。

此外, 扩充的存储器分段管理机制和可选的存储器分页管理机制,有如下好处:

  • 不仅为存储器共享和保护提供了硬件支持,而且为实现虚拟存储器提供了硬件支持。
  • 支持多任务,能够快速地进行任务切换(switch)和保护任务环境(context)。
  • 4 个特权级和完善的特权检查机制,既能实现资源共享又能保证代码和数据的安全和保密及任务的隔离。
  • 支持虚拟 8086 方式,便于执行 8086 程序。

1.4.4 延伸知识:段选择子、特权级细节

这一小节会对段选择子及特权级的细节进行总结。

特权级
  • 通过提供 4 个特权级和完善的特权检查机制,既能实现资源共享又能保证代码和数据的安全和保密及任务的隔离。
  • 在保护模式下,特权级总共有 4 个,编号从 0(最高特权)到 3(最低特权)。有 3 种主要的资源受到保护:内存,I/O 地址空间以及执行特殊机器指令的能力。Linux 只用 0 级和 3 级,分别称之为内核态和用户态。
  • 在任一时刻,80386 CPU 都是在一个特定的特权级下运行的,从而决定了代码可以做什么,不可以做什么。这些特权级经常被称为为保护环(protection ring),最内的环(ring 0)对应于最高特权 0,最外面的环(ring 3)一般给应用程序使用,对应最低特权 3。
  • 在保护模式下,可以通过查看 CS 寄存器的低 2 位来了解进程的当前特权级(CPL)。
CPL、RPL 与 DPL
  • CPL 是当前进程的权限级别(Current Privilege Level),是当前正在执行的代码所在的段的特权级,存在于 CS 寄存器的低 2 位

  • RPL 说明的是进程对段访问的请求权限(Request Privilege Level),它存在于段选择子的低 2 位,每个段选择子有自己的 RPL。它说明的是进程对段访问的请求权限,有点像函数参数。而且 RPL 对每个段来说不是固定的,两次访问同一段时的 RPL 可以不同。RPL 可能会削弱 CPL 的作用,例如当前 CPL=0 的进程要访问一个数据段,它把段选择子中的 RPL 设为 3,这样它对该段只有特权为 3 的访问权限。

    RPL 引入的目的是避免低特权级的程序访问高特权级的资源。

  • DPL 存储在段描述符中,规定访问该段所需的权限级别(Descriptor Privilege Level),每个段的 DPL 是固定的。

  • 当进程访问一个段时,即处理器将段选择子载入段寄存器之前,要进行特权级检查,比较当前运行的进程或任务的特权级(CPL),段选择子的 RPL,还有该段的段描述符的 DPL。一般要求在数值上: CPL <= DPL && RPL <= DPL(注意,数字越大特权级越低),满足此条件时处理器会将段选择子装载入段寄存器,否则不会装载到段寄存器。

段选择子的存储布局
  • Index:段寄存器的高 13 位存放的就是 GDT 的 Index。
  • Table Indicator(TI):Index 后面的 1 位,用来指示描述符表的类别,0 表示 GDT,1 表示 LDT(Local Descriptor Table)。
  • Requested Privilege Level(RPL):低 2 位存放的是进程对段访问的请求特权级。
段选择子(Segment Selector)

它是这样定义的:

A segment selector is a 16-bit identifier for a segment . It does not point directly to the segment, but instead points to the segment descriptor that defines the segment.

也就是前面说的,段选择子是一个 16 位的段标识符,它不直接指向段,而是指向 GDT 中定义该段的段描述符。此外还需要理解:

  • 处理器一共提供了 6 个段寄存器来保存段选择子。
  • 当一个进程要访问某个段的时候,这个段的段选择子必须被赋值到某一个段寄存器中。因此,尽管系统定义了数千个段,只有 6 个段是可以被直接使用的。其他段只有在它们的段选择子被置入这些寄存器中时才可以被使用。
  • 对任何程序的执行而言,至少要将代码段寄存器(CS),数据段寄存器(DS)和堆栈段寄存器(SS)赋予有效的段选择子。此外,处理器还提供了另外 3 个数据段寄存器(ES,FS 和 GS)供进程使用。

也就是说:

  • 在同一时刻程序中可以有多个段选择子,也就是可以有多个 RPL,然而只有 CS 寄存器(也就是存放正在执行的代码的寄存器)中的 RPL才等于 CPL。

CS 段寄存器指向的是 CPU 中当前运行的指令,所以 CS 的 RPL 位称为当前特权级 CPL。所以得出结论:CS.RPL 的值 = CPL 的值。

1.5 指令指针寄存器

32 位 CPU 把指令指针扩展到 32 位,并记作 EIP,EIP 的低 16 位与先前 16 位 CPU 中的 IP 作用相同。

指令指针 EIP、IP(InstructionPointer)是存放下次将要执行的指令在代码段的偏移量。在具有预取指令功能的系统中,下次要执行的指令通常已被预取到指令队列中,除非发生转移情况。所以,在理解它们的功能时,不考虑存在指令队列的情况。

在实模式下,由于每个段的最大范围为 64K,所以,EIP 中的高 16 位肯定都为 0,此时,相当于只用其低 16 位的 IP 来反映程序中指令的执行次序。

1.6 标志寄存器

以下是运算结果标志位。

1.6.1 进位标志 CF(CarryFlag)

进位标志 CF 主要用来反映运算是否产生进位或借位。如果运算结果的最高位产生了一个进位或借位,那么,其值为 1,否则其值为 0。

使用该标志位的情况有:多字(字节)数的加减运算,无符号数的大小比较运算,移位操作,字(字节)之间移位,专门改变 CF 值的指令等。

1.6.2 奇偶标志 PF(ParityFlag)

奇偶标志 PF 用于反映运算结果中“1”的个数的奇偶性。如果“1”的个数为偶数,则 PF 的值为 1,否则其值为 0。

利用 PF 可进行奇偶校验检查,或产生奇偶校验位。在数据传送过程中,为了提供传送的可靠性,如果采用奇偶校验的方法,就可使用该标志位。

1.6.3 辅助进位标志 AF(AuxiliaryCarryFlag)

在发生下列情况时,辅助进位标志 AF 的值被置为 1,否则其值为 0:

  1. 在字操作时,发生低字节向高字节进位或借位时。
  2. 在字节操作时,发生低 4 位向高 4 位进位或借位时。

1.6.4 零标志 ZF(ZeroFlag)

零标志 ZF 用来反映运算结果是否为 0。如果运算结果为 0,则其值为 1,否则其值为 0。在判断运算结果是否为 0 时,可使用此标志位。

16.5 符号标志 SF(SignFlag)

符号标志 SF 用来反映运算结果的符号位,它与运算结果的最高位相同。在微机系统中,有符号数采用补码表示法,所以,SF 也就反映运算结果的正负号。运算结果为正数时,SF 的值为 0,否则其值为 1。

1.6.6 溢出标志 OF(OverflowFlag)

溢出标志 OF 用于反映有符号数加减运算所得结果是否溢出。如果运算结果超过当前运算位数所能表示的范围,则称为溢出,OF 的值被置为 1,否则,OF 的值被清为 0。

对以上 6 个运算结果标志位,在一般编程情况下,标志位 CF、ZF、SF 和 OF 的使用频率较高,而标志位 PF 和 AF 的使用频率较低。

2. X86 平台启动过程

2.1 读取第一条指令

在计算机通电后,寄存器在初始状态下会有一个缺省值。

CS(16 位的段寄存器的基址) 和 EIP(段内偏移) 结合在一起形成了启动后的第一条地址,CPU 会从该地址获取指令来执行。

上面有介绍,X86 为了向下兼容 8086,刚启动时是处于 16 位的实模式,会按照实模式的寻址方式来寻址:

CS: IP 的值:: IP 表示 CS 左移了 4 位再加上 IP 地址。

CS:16 位

EIP:16 位

所以第一条地址的实际值如下:

  • CS = F000H,EIP = 0000FFF0H

  • 实际地址是:

    Base + EIP = FFFF0000H + 0000FFF0H = FFFFFFF0H,这就是 BIOS 的 EPROM(Erasable Programmable Read Only Memory)所在地址。

  • 当 CS 被新值加载,地址转换规则将开始起作用。

  • 通常第一条指令是一条长跳转指令(这样 CS 和 EIP 都会更新),会跳转到 BIOS 代码中做初始化工作

2.2 BIOS 所做的工作

BIOS 主要做一些硬件的初始化工作,完成一些关键硬件的自检,随后:

  • BIOS 会加载存储设备的第一个扇区(Master Boot Record 即主引导扇区,简写为 MBR)中的 512 字节内容读取到内存的固定地址 0x7c00 处。
  • 然后 IP 寄存器地址会跳转到 0x7c00 这个地址,使得 CPU 可以执行 MBR 中的的代码,即 Bootloader 的代码。
  • Bootloader 会完成操作系统的进一步加载。

2.3 Bootloader 所做的工作

Bootloader 所做的工作,简单说就是进入保护模式,加载操作系统到内存,如下:

  • 从16 位寻址空间的实模式切换到32 位寻址空间的保护模式(即从 1MB 到 4GB),为后续操作系统的执行做准备,同时段机制(Segment-Level Protection)也被加载。
  • 从硬盘中 MBR 后面的扇区读取操作系统 kernel 代码,加载到内存中固定位置。
  • 根据 CS 寄存器中 Index 值来找到操作系统所在内存的起始地址,即操作系统的入口点(entry point),来把控制权交给了操作系统。

这样 Bootloader 就完成了操作系统的加载工作。

当然这还没完,仅仅是把操作系统加载到内存中,接下来终于要轮到操作系统登场了。

在总结完后看到条 2017 年的新闻:Intel 已经决心在 2020 年之前,彻底淘汰 PC BIOS,全面向 UEFI 固件过渡,而这也意味着一个时代的终结。

继任者叫 UEFI。

3. 参考

  1. 段选择符 段寄存器
  2. 通用32位CPU 常用寄存器及其作用
  3. CPL RPL与DPL 之间的区别和联系

Context 包解析

Context 用来管理 goroutine 的上下文,包括传递请求作用域的值、退出信号和处理一个请求时涉及的所有 goroutine 的截止时间等。

代码解析

// Context 的方法可以由多个协程同时调用
type Context interface {
    // Deadline 返回代表这个 context 的工作取消的时间
    // 当没有设置截止时间时,ok==false
    // 连续调用 Deadline 时只有第一次生效,后面都返回同样的结果
	Deadline() (deadline time.Time, ok bool)
    
    // Done 返回一个关闭的 channel,代表这个 context 的工作应该被取消。如果这个 context 永远不能取消则应该返回 nil。连续调用时只有第一次生效,后面都返回同样的结果
	Done() <-chan struct{}

	Err() error

	Value(key interface{}) interface{}
}

// 当 context 退出时 Context.Err 返回的错误
var Canceled = errors.New("context canceled")

// 当过了 context 的截止时间后,Context.Err 返回的错误
var DeadlineExceeded error = deadlineExceededError{}

type deadlineExceededError struct{}

// 没有接收器名称的方法,这里用的是非指针的 deadlineExceededError。为什么???
func (deadlineExceededError) Error() string   { return "context deadline exceeded" }
func (deadlineExceededError) Timeout() bool   { return true }
func (deadlineExceededError) Temporary() bool { return true }

// 永远不退出,没有值,没有截止时间。它不是结构体,因此这个类型的变量必须有确定的地址
type emptyCtx int

// 方法的接收器都是指针类型
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

func (e *emptyCtx) String() string {
	switch e {
	case background:
		return "context.Background"
	case todo:
		return "context.TODO"
	}
	return "unknown empty Context"
}

var (
	background = new(emptyCtx) // 全局唯一的地址
	todo       = new(emptyCtx)
)

// 实际是 emptyCtx,通常用于 main 函数、初始化和测试
func Background() Context {
	return background
}

// 也是 emptyCtx,当不清楚用那种 context 或还没有可用的 context 时使用
func TODO() Context {
	return todo
}

// 定义一个函数类型,调用时立即停止某项操作,只在第一次调用时生效,后续调用什么都不做
type CancelFunc func()

// 复制一份 parent context,此副本作为 parent 的 child 并遵循其退出行为。
// 当调用 cancel 函数时或 parent 的 Done channel 关闭时,ctx 的 Done channel 将关闭,无论哪一个先发生。
// 返回 context 的副本和一个用来将此副本从 parent 的 children 中移除并退出的函数
// 应当在运行在此 context 中的工作完成时执行此 CancelFunc,context 退出时将释放与之相关的资源
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) } // 返回的还是地址,interface 可以是指针
}

// 返回一个初始化的 cancelCtx
func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

// 判断 parent 是否退出,其 child 也遵循 parent 的退出行为
func propagateCancel(parent Context, child canceler) {
	if parent.Done() == nil {
		return // 表示 parent 实际是 emptyCtx,永不退出
	}
	if p, ok := parentCancelCtx(parent); ok { // 判断 parent 是不是 cancelCtx 类型
		p.mu.Lock()
		if p.err != nil {
			// p.err 有值,说明 parent 已经退出,child 也将退出,但不从其 parent 的 children 中移除
			child.cancel(false, p.err)
		} else {
              // 如果 parent 没有退出且还没有 child,则将此 child 作为其 child
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{} // 值不重要,只是一个空 struct
		}
		p.mu.Unlock()
	} else {
         // 如果 parent 没有退出,启动协程等待 parent 和 child 退出
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

// 循环检测 parent 的实际类型,直到发现是 *cancelCtx 时返回其指针和 true
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
	for {
		switch c := parent.(type) {
		case *cancelCtx: // 和上面 WithCancel 一样,这里也是指针
			return c, true
		case *timerCtx:
			return &c.cancelCtx, true
		case *valueCtx:
			parent = c.Context // 继续循环
		default:
			return nil, false // 上面的 case 都没有匹配到时,说明是 background 或 TODO类型,直接返回
		}
	}
}

// 从 parent 中移除 child
func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent) // 判断 parent 类型
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}

// 是 context 类型,可以直接退出。*cancelCtx 和 *timerCtx 是其具体实现
type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

// 可重复使用的关闭的通道
var closedchan = make(chan struct{})

func init() {
	close(closedchan) // 一开始就关闭 closedchan
}

// cancelCtx 可以退出,当退出时,它也取消其任何实现了 canceler 的 children
type cancelCtx struct {
	Context

	mu       sync.Mutex            // 保护以下字段
	done     chan struct{}         // created lazily, 通过第一个取消调用来关闭
	children map[canceler]struct{} // 通过第一个取消调用将其设置为 nil
	err      error                 // 通过第一个取消调用将其设置为非 nil
}

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.err
}

func (c *cancelCtx) String() string {
	return fmt.Sprintf("%v.WithCancel", c.Context)
}

// 这个函数关闭 c.done,退出其每个 child,如果 removeFromParent 是 true,
// 则将 c 从其 parent 的 children 中移除
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // 说明已经退出过,直接返回
	}
	c.err = err
    // 如果没有 done,则将关闭的 channel 赋值给它,如果有则关闭
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
    // 将 c 的 children 都退出,注意这里的实参 false 和上面的 removeFromParent 没有关系,
    // 因为 removeFromParent 是用来决定是否将 c 从其 parent 的 children 中移除
	for child := range c.children {
        // 当持有 parent 的锁时也就获取了 child 的锁,可以直接操作
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

// 返回 parent 的一个副本,其截止时间是 deadline。如果 parent 的截止时间比 deadline 早,
// 则新 context 在语义上等同于 parent。当时间过了 deadline、cancel 函数被调用、parent 的
// Done channel 关闭时,新 context 的 Done channel 将被关闭,无论哪种情况先发生。
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
	if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
		// 只有 timerCtx 的 Deadline 返回 true,如果 parent 的截止时间早于 deadline,
        // 直接返回带有 cancel 函数的 context
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  deadline,
	}
	propagateCancel(parent, c) // 遵循 parent 的退出行为
	d := time.Until(deadline)
	if d <= 0 {
         // 已经过了 deadline,将 c 退出并从其 parent 的 children 中移除
		c.cancel(true, DeadlineExceeded)
		return c, func() { c.cancel(true, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
         // 设置退出时间,超时后 c 将从其 parent 的 children 中移除,可在此时间之前撤销此操作
		c.timer = time.AfterFunc(d, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

// 它通过停止 timer 来实现退出操作,然后将退出操作委托给 cancelCtx.cancel。
// It implements cancel by stopping its timer then delegating to cancelCtx.cancel.
type timerCtx struct {
	cancelCtx // timerCtx 具有 cancelCtx 的全部字段和方法
	timer *time.Timer // 从属于 cancelCtx.mu

	deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
	return c.deadline, true
}

func (c *timerCtx) String() string {
	return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, time.Until(c.deadline))
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
     // c 的 parent 退出后,parent 的 child 也会退出
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// 将 c 从其 parent 的 children 中移除
		removeChild(c.cancelCtx.Context, c)
	}
    // 如果有计时器,停止计时器
	c.mu.Lock()
	if c.timer != nil {
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

// 返回带有键值对的 parent 的副本。
// context 值仅用于传输处理和 API 的请求域,而不是用于给函数传递可选参数。
// 提供的 key 必须是可比较的,并且不能是 string 等内置类型,以避免包与 context 之间的冲突。
// 使用此函数的用户应当为 key 定义用户自己的类型。
// 当分配给一个 interface{} 时,要避免重新分配内存,context 的 key 通常使用具体类型 struct{}。
// 或者,对外暴露的 context key 变量的静态类型应该是一个指针或 interface。
func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

// valueCtx 带有一个键值对。它实现了该键的值,并且将所有其它调用都委托给嵌套的 Context。
type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

Tendermint 中验证人节点的添加

添加验证人的两种方式

添加 Tendermint 验证人有两种方式:

  • 在启动 Tendermint 网络前,在 genesis.json 中进行操作。可以创建一个新的 priv_validator.json 文件,然后把里面的 pub_key 拷贝到 genesis.json 文件中。
  • 在一个运行中的 Tendermint 网络中,通过 ABCI 应用的 EndBlock 方法添加验证人。

前面的文档已经对第一种方式进行了说明,这里讨论第二种方式,即如何在运行中的 Tendermint 网络中添加验证人节点。

启动 Tendermint 前添加

最简单的方式是在启动 Tendermint 网络前,在 genesis.json 中进行操作。可以创建一个新的 priv_validator.json 文件,然后把里面的 pub_key 拷贝到 genesis.json 文件中。

执行这条命令来生成 priv_validator.json

tendermint gen_validator

现在可以更新 genesis 文件了。比如新的 priv_validator.json 长这样:

{
  "address" : "5AF49D2A2D4F5AD4C7C8C4CC2FB020131E9C4902",
  "pub_key" : {
    "value" : "l9X9+fjkeBzDfPGbUM7AMIRE6uJN78zN5+lk5OYotek=",
    "type" : "AC26791624DE60"
  },
  "priv_key" : {
    "value" : "EDJY9W6zlAw+su6ITgTKg2nTZcHAH1NMTW5iwlgmNDuX1f35+OR4HMN88ZtQzsAwhETq4k3vzM3n6WTk5ii16Q==",
    "type" : "954568A3288910"
  },
  "last_step" : 0,
  "last_round" : 0,
  "last_height" : 0
}

然后新的 genesis.json 将长这样:

{
  "validators" : [
    {
      "pub_key" : {
        "value" : "h3hk+QE8c6QLTySp8TcfzclJw/BG79ziGB/pIA+DfPE=",
        "type" : "AC26791624DE60"
      },
      "power" : 10,
      "name" : ""
    },
    {
      "pub_key" : {
        "value" : "l9X9+fjkeBzDfPGbUM7AMIRE6uJN78zN5+lk5OYotek=",
        "type" : "AC26791624DE60"
      },
      "power" : 10,
      "name" : ""
    }
  ],
  "app_hash" : "",
  "chain_id" : "test-chain-rDlYSN",
  "genesis_time" : "0001-01-01T00:00:00Z"
}

更新本机 ~/.tendermint/config 目录中的 genesis.json。把 genesis 文件和新的 priv_validator.json 拷贝到新机器上的 ~/.tendermint/config 目录中。

现在在所有机器上执行 tendermint node,使用 --p2p.persistent_peers/dial_peers 来让它们互为 peer。他们应该开始生成区块,只要他们都在线就会继续生成区块。

要让 Tendermint 网络可以容忍其中一个验证人失败,至少需要四个验证人节点(> 2/3)。

启动 Tendermint 后添加

EndBlock 方法的定义

EndBlock 是 abci 中 Application 接口中定义的一个方法:

type Application interface {
	// Info/Query Connection
	Info(RequestInfo) ResponseInfo                // Return application info
	SetOption(RequestSetOption) ResponseSetOption // Set application option
	Query(RequestQuery) ResponseQuery             // Query for state

	// Mempool Connection
	CheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool

	// Consensus Connection
	InitChain(RequestInitChain) ResponseInitChain    // Initialize blockchain with validators and other info from TendermintCore
	BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
	DeliverTx(tx []byte) ResponseDeliverTx           // Deliver a tx for full processing
	EndBlock(RequestEndBlock) ResponseEndBlock       // Signals the end of a block, returns changes to the validator set
	Commit() ResponseCommit                          // Commit the state and return the application Merkle root hash
}

它可以用来在每个区块结束时运行一些代码,此外,应答可以包含验证人列表,可以用来更新验证人集合。要添加新验证人或更新现有验证人,只需将它们包括在 EndBlock 应答返回的列表中即可。要移除一个验证人,将其 power 设为 0 并放入此列表。Tendermint 将负责更新验证人集合。

注意,如果希望轻客户端能够从外部证明状态的转换,则投票权重的变化必须严格小于每个区块的 1/3。参考 这篇文档 来查看它如何追踪验证人。

Tendermint 源码中如何进行验证人的更新

相关代码在 tendermint/state/execution.go 第 315 行。

执行 ApplyBlock 方法时会执行 updateState 方法,它会根据执行 execBlockOnProxyApp 返回的应答来更新状态:

// update the validator set with the latest abciResponses
lastHeightValsChanged := state.LastHeightValidatorsChanged
if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
   err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
   if err != nil {
      return state, fmt.Errorf("Error changing validator set: %v", err)
   }
   // change results from this height but only applies to the next height
   lastHeightValsChanged = header.Height + 1
}

execBlockOnProxyApp 方法内部会调用开发者定义的 ABCI 应用的 BeginBlockDeliverTxEndBlock 方法。

ABCI 应用中如何实现验证人的更新

处理逻辑就是在由客户端向 ABCI 应用提交交易时,在 DeliverTx 方法中进行验证人的更新。当 Tendermint 在 ApplyBlock 方法中应用区块时,会调用此方法。

这里更新验证人的交易格式为 val:pubkey/power,看一下它的具体实现:

// tx is either "val:pubkey/power" or "key=value" or just arbitrary bytes
func (app *PersistentKVStoreApplication) DeliverTx(tx []byte) types.ResponseDeliverTx {
	// if it starts with "val:", update the validator set
	// format is "val:pubkey/power"
	if isValidatorTx(tx) {
		// update validators in the merkle tree
		// and in app.ValUpdates
		return app.execValidatorTx(tx)
	}

	// otherwise, update the key-value store
	return app.app.DeliverTx(tx)
}

execValidatorTx 方法会在数据库及 app 的 ValUpdates 字段中更新验证人:

func (app *PersistentKVStoreApplication) execValidatorTx(tx []byte) types.ResponseDeliverTx {
	tx = tx[len(ValidatorSetChangePrefix):]

	//get the pubkey and power
	pubKeyAndPower := strings.Split(string(tx), "/")
	if len(pubKeyAndPower) != 2 {
		return types.ResponseDeliverTx{
			Code: code.CodeTypeEncodingError,
			Log:  fmt.Sprintf("Expected 'pubkey/power'. Got %v", pubKeyAndPower)}
	}
	pubkeyS, powerS := pubKeyAndPower[0], pubKeyAndPower[1]

	// decode the pubkey
	pubkey, err := hex.DecodeString(pubkeyS)
	if err != nil {
		return types.ResponseDeliverTx{
			Code: code.CodeTypeEncodingError,
			Log:  fmt.Sprintf("Pubkey (%s) is invalid hex", pubkeyS)}
	}

	// decode the power
	power, err := strconv.ParseInt(powerS, 10, 64)
	if err != nil {
		return types.ResponseDeliverTx{
			Code: code.CodeTypeEncodingError,
			Log:  fmt.Sprintf("Power (%s) is not an int", powerS)}
	}

	// update
	return app.updateValidator(types.Ed25519Validator(pubkey, int64(power)))
}

References

Golang 中的指针

指针的定义

A pointer is a value that points to the memory address of another variable.

指针是一个指向另一个变量内存地址的值。

示例 1

package main

import (
	"fmt"
)

func main() {
	a := 200
	b := &a

	fmt.Println("a -->", a)
	fmt.Println("*b -->", *b)
	fmt.Println("&a -->", &a)
	fmt.Println("b -->", b)

	*b++

	fmt.Println("a -->", a)
	fmt.Println("*b -->", *b)
}
  • 变量 a 的值是 200,变量 b 的值就是变量 a 的内存地址。b 就是一个指针,它指向 a 所在的内存地址。
  • *b++ 是对变量 b 指向的内存地址取值后再对该值加 1,即变量 a 的值加 1,执行结果如下:
a --> 200
*b --> 200
&a --> 0xc4200160b0
b --> 0xc4200160b0
a --> 201
*b --> 201

示例 2

package main

import (
	"fmt"
)

type packet struct {
	id   int
	size int
}

var (
	p packet
	q *packet
)

func main() {
	p = packet{
		id:   1,
		size: 2,
	}

	q = &p

	fmt.Println("p -->", p)
	fmt.Println("q -->", q)
	fmt.Println("*q -->", *q)
	fmt.Println("&p -->", &p)
	fmt.Println("&q -->", &q)
	fmt.Println("p.size -->", p.size)
	fmt.Println("(*q).size -->", (*q).size)
	fmt.Println("q.size -->", q.size)
}
  • 首先定义了一个结构类型 packet,又声明了两个变量 p 和 q,p 是一个普通 packet 结构,q 则是一个 packet 类型的指针。

  • 初始化 p,q 的值是 p 的地址,即 q 是一个指向 p 内存地址的指针。

  • 执行结果如下:

    p --> {1 2}
    q --> &{1 2}
    *q --> {1 2}
    &p --> &{1 2}
    &q --> 0x113e040
    p.size --> 2
    (*q).size --> 2
    q.size --> 2
    • 从执行结果可以看出,q 的值是对 p 进行取地址操作。
    • *q 是对 q 指向的内存地址取值,即 p 的值。
    • &q 是指针 q 所在的内存地址。
    • 这里需要注意的是,(*q).sizeq.size 结果一样。
      • (*q).size 是先对 q 所指的内存地址取值,即 p 的值,p 的 size 字段值为 2。

      • q.size 是直接对 q 指向的内存地址的 size 字段取值,结果与上面一样。

参考资料:Understand Go pointers in less than 800 words or your money back

更改 CentOS 7 默认的 SSH 端口

本文是对 How To Change OpenSSH Port On CentOS 7 的简单翻译,部分内容有更改。

一些网站管理员认为,更改默认为 22 的 SSH 端口号可以增强安全性。这是因为每个人都知道 SSH 默认端口号是 22,当然也包括黑客,所以是不安全的。

将 SSH 端口号更改为 22 以外的其他值将增强服务器的安全性,因为坏人不会知道 SSH 通信的端口。这是一个很酷的技巧,但不会阻止那些决心闯入你服务器的人。

只需使用简单的端口扫描程序或类似工具,黑客就可以找出服务器上的所有连接端口。这是一项古老的技术,可能在我们今天的时间里不适用。

在我看来,保护 SSH 服务器的最佳方法是使用证书和加密实现无密码登录。使用此方法,只允许已使用加密密钥的计算机使用 SSH 协议登录。

另一种方法是通过将该计算机的 IP 地址放到防火墙白名单中,来将防火墙配置为仅允许来自预定义计算机的 SSH 连接。其他任何东西都不会更好地增强您的服务器安全性。

以下是更改步骤。

  1. 首先,备份 SSH 配置文件

    sudo cp /etc/ssh/sshd_config /etc/ssh/sshd_config.bak
  2. 开始修改配置文件

    sudo vi /etc/ssh/sshd_config

    修改内容,可以把端口修改为你想要修改的值:

    这里可以在保留 22 端口的情况下,先增加一个新端口,以确保可以通过新端口连接,然后再禁用掉 22 端口。

    # If you want to change the port on a SELinux system, you have to tell
    # SELinux about this change.
    # semanage port -a -t ssh_port_t -p tcp #PORTNUMBER
    #
    Port 22
    Port 2244
    #AddressFamily any
    #ListenAddress 0.0.0.0
    #ListenAddress ::

    保存文件,在完成修改之前,不要退出登录。

  3. 默认情况下,SELinux 只允许端口 22 用于 SSH,现在需要做的是通过 SELinux 启用新创建的端口。执行以下命令:

    sudo semanage port -a -t ssh_port_t -p tcp 2244

    如果执行以上命令后报错:semanage command not found,执行这条命令来安装 semanage

    sudo yum -y install policycoreutils-python

    安装成功后,再次执行第一条命令,通过 SELinux 来允许新端口。

  4. 执行以下命令允许新端口通过防火墙。

    sudo firewall-cmd --permanent --zone=public --add-port=2244/tcp

    如果报错:FirewallD is not running,表示防火墙还没有开启,执行以下命令来开启防火墙:

    systemctl enable firewalld
    systemctl start firewalld

    然后查看防火墙状态:

    systemctl status firewalld
    
    # 或者以下命令
    firewall-cmd --state
  5. 重新加载防火墙配置。

    sudo firewall-cmd --reload
  6. 重启 SSH 服务。

    sudo systemctl restart sshd.service

    查看 sshd 状态,看添加的端口是否生效。

    systemctl status sshd.service
  7. 或者通过运行以下命令验证 SSH 现在是否在新端口上运行。

    ss -tnlp | grep ssh
  8. 退出并尝试使用新端口号登录。

    ssh [email protected] -p 2244
  9. 禁用 22 端口。

  10. 重启 SSH 服务。

另外,如果使用的是阿里云的 ECS,有一个安全组规则,可以设置入方向和出方向的端口。这里只要在入方向里面添加上面配置的新端口即可。

docker-compose 启动 MySQL 的问题记录

问题

在工作中使用 docker-compose 来启动 MySQL 容器时,由于没有配置字符集,出现以下问题:

  • 数据库里的中文记录在网页上无法正常显示,全部是乱码。
  • 修改配置文件后,在容器中打开 MySQL 终端后,修改记录时无法输入中文。

这是原来的 docker-compose.yaml 文件:

version: "2"

services:
  mysql:
    container_name: mysql-name
    image: mysql
    volumes:
      - ./mysql:/var/lib/mysql
    ports:
      - "127.0.0.1:3308:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=000000
    restart: always

解决过程

以交互模式进入容器:

docker exec -it containerName sh

进入 MySQL 终端:

mysql -u userName -p

选择数据库后,执行下面的命令查看字符集情况, 发现基本都是 latin1:

SHOW VARIABLES LIKE 'character_set_%';

SHOW VARIABLES LIKE 'collation_%';

参考网上的教程修改:

# 解决外部访问数据乱码问题
SET NAMES 'utf8';

# 上面这条命令相当于下面的三条命令
SET character_set_client = utf8;
SET character_set_results = utf8;
SET character_set_connection = utf8;

# 如果已经建立了数据库,可以通过以下语句修改字符集
alter database name character set utf8; # 修改数据库
alter table type character set utf8; # 修改表
alter table type modify type_name varchar(50) CHARACTER SET utf8; # 修改字段

# 修改配置文件,进入容器后找到:etc/mysql/mysql.conf.d/mysqld.cnf
[mysql]
default-character-set = utf8

[mysql.server]
default-character-set = utf8
 
[mysqld_safe]
default-character-set = utf8
 
[client]
default-character-set = utf8
 
[mysqld]
default-character-set = utf8
character_set_server = utf8 

执行以上命令后,再查看字符集,发现大部分都修改为 utf-8 了,在网页端的中文也能正常显示了。

如果想要在 MySQL 终端中能够输入中文,还需要按下面的方式进入终端:

LANG=C.UTF-8 mysql -u username -p

新的问题

在停止容器后再启动时,出现容器一直处于 restarting 状态的情况,执行这条命令来查看指定容器日志:

docker logs --tail 50 --follow --timestamps containerName

发现错误:unknown variable 'default-character-set=utf8'

这时因为无法进入容器来修改 MySQL 的配置文件,只能先删除容器。

修改配置文件:新添加的内容中,删除 [mysqld] 标签下的 default-character-set=utf8 ,保留 character_set_server=utf8 ,删除其它标签及内容。

[mysqld]
character_set_server = utf8 

通过挂载 volume 的方式来使用自定义的配置文件,修改好后的 docker-compose.yaml 文件:

version: "2"

services:
  mysql:
    container_name: mysql-name
    image: mysql
    volumes:
      - ./mysql:/var/lib/mysql
      - ./mysql/conf:/etc/mysql/mysql.conf.d
    ports:
      - "127.0.0.1:3308:3306"
    environment:
      - MYSQL_ROOT_PASSWORD=000000
      - LANG=C.UTF-8
    restart: always

一个 docker-compose.yaml 中可能有多个服务,可以通过这条命令来启动指定服务:

docker-compose up -d mysql

这时再查看字符集,除了 character_set_filesystem 外已全部是 utf-8,进入终端时不指定 LANG 也能输入中文。

TCP 断开连接时 TIME_WAIT 存在的原因

TIME_WAIT 是主动关闭 TCP 连接一方在发出最后一个 ACK 后需要等待的状态,时长为 2MSL。TIME_WAIT 状态的存在主要有以下原因。(因为 TCP 连接的断开一般是由客户端发起关闭连接的操作,所以为了方便描述,后文把主动关闭方称为 C 端(Client),被动关闭方称为 S 端(Server)。)

1. C 端要确认 S 端是否有收到此 ACK

因为 TCP 连接是全双工的,因此断开连接需要双方连接都关闭。整个断开连接的过程简要描述如下:

  1. 过程一:C 端发送 FIN,S 端收到后发送该 FIN 的 ACK;
  2. 过程二:S 端发送 FIN,C 端收到后发送该 FIN 的 ACK;
  3. 过程三:S 端收到 ACK。

要实现 TCP 全双工连接的正常终止,必须处理终止过程中四个报文中任何一个报文的丢失情况 。完成以上三个过程,才说明 TCP 全双工连接已经断开。

在过程三后,S 端就可以 100% 确认连接已经可以关闭,因此它便可以直接进入 CLOSED 状态了。然而对于 C 端来说,它无法确定最后发给 S 端的那个 ACK 是否已经被收到。根据 TCP 协议规范,不对 ACK 进行 ACK,因此它不可能再收到 S 端的 ACK 了。

那么在这里就陷入了僵局,TCP 连接的主动关闭方如何来保证整个断开连接过程的闭合?这时协议外的东西就起作用了。

TCP 报文段有一个超时值,即 MSL(Maximum Segment Lifetime),它是 TCP 报文段在网络上存在的最长时间,超过这个时间报文将被丢弃。MSL 在 RFC 1122 上建议是 2 分钟,而源自 berkeley 的 TCP 实现传统上使用 30 秒。TIME_WAIT 状态维持时间是 2MSL 时间长度,也就是在 1-4 分钟。

这类超时值非常重要,因为它们给出了一个物理意义上的不可逾越的界限,它们是自洽协议的唯一外部输入。

综上,如果这个 ACK 丢失,S 端将重发出最终的 FIN,因此 C 端必须要维持 TIME_WAIT 状态,以允许它重发最终的 ACK。如果 C 端不维持 TIME_WAIT 状态,而是在发出最终的 ACK 后就转为 CLOSED 状态,那么 C 端将响应 RST 报文,S 端收到后将此解释成一个错误(RST 表示复位,用来异常的关闭连接)。

2. TIME_WAIT 的时间为什么是 2MSL

等待 2MSL 时间主要目的是怕 S 端没收到最后一个 ACK,那么 S 端将在超时后重发第三次握手的 FIN 包,C 端接到重发的 FIN 包后可以再发一个 ACK 应答包。

分析 TIME_WAIT 时长是 2MSL 的原因:

  • 假设 S 端在 C 端发出 ACK 包 1MSL 时间后还没有收到此包,则需要重发 FIN 包。此时过了第一个 MSL 时间。
  • 从过了第一个 MSL 时间开始算,如果再过 1MSL 时间 C 端还没有收到 S 端重发的 FIN 包,则说明 S 端收到了其发送的 ACK,这时就可以进入 CLOSED 状态了。

3. 处于 TIME_WAIT 状态的连接为什么不能启动一个新连接

TCP 报文可能由于路由器异常而“迷途”,在迷途期间,TCP 发送端可能因确认超时而重发这个报文,迷途的报文在路由器修复后也会被送到最终目的地,这个迟到的迷途报文到达时可能会引起问题。

在关闭前一个连接之后,马上又重新建立起一个相同的 IP 和端口之间的新连接,前一个连接的迷途重复分组在前一个连接终止后到达,而被新连接收到了。

为了避免这个情况,在 TIME_WAIT 状态下,上一次建立连接的套接字 (Socket) 将不可再重新启用,也就是同一个网卡 / IP 不可再建立同样端口号的连接,如果再重新创建系统将会报错。

要等待 TIME_WAIT 这个时间,也是为了避免有些报文段在网络上滞留,被对方收到的时候如果刚好又启用了一个完全一样的套接字,那么就会被认为是这个新连接的数据。因此为了让所有 “迷路” 的报文彻底消失后,才能启用相同的套接字。

4. 参考

  1. http://elf8848.iteye.com/blog/1739571
  2. https://en.wikipedia.org/wiki/Maximum_segment_lifetime

CentOS 7 安装 Docker CE

之前在公司的云主机上安装过,今天在自己的阿里云 ECS 上安装,还得再去翻看官方文档,为了方便记录,对 官方文档 安装部分进行了简单翻译。

虽然就几条命令,也可以扩展并深入。里面也提到了比较底层的部分,比如存储驱动 overlay2,有兴趣可以深入一下。

先决条件

  1. 维护版的 CentOS 7。
  2. centos-extras 必须是激活状态。默认是激活的,如果被关闭了,需要重新激活
  3. 现在推荐使用的存储驱动是 overlay2,之前是 aufs

卸载旧版本

旧版本的 Docker 叫做 dockerdocker-engine。如果有安装它们,执行以下命令来卸载它及其依赖项:

$ sudo yum remove docker \
                  docker-client \
                  docker-client-latest \
                  docker-common \
                  docker-latest \
                  docker-latest-logrotate \
                  docker-logrotate \
                  docker-selinux \
                  docker-engine-selinux \
                  docker-engine

保留 /var/lib/docker/ 的内容,包括镜像、容器、卷 和网络。Docker CE 的包目前叫做 docker-ce

安装 Docker CE

有几种不同的安装方式,这里介绍如何使用库安装。安装这些库之后,可以从这些库来安装和更新 Docker。

设置库

  1. 安装所需包。

    yum-utils 提供 yum-config-manager 工具,devicemapper 存储驱动还需要 device-mapper-persistent-datalvm2

    $ sudo yum install -y yum-utils \
      device-mapper-persistent-data \
      lvm2
  2. 以下命令安装稳定版本。注意,任何时候都需要稳定版本库,即使想安装 edge 或 test 版本的 docker。

    $ sudo yum-config-manager \
        --add-repo \
        https://download.docker.com/linux/centos/docker-ce.repo
  3. 可选:激活 edge 和 test 库。它们包含在 docker.repo 中,默认是关闭的。可以与稳定库一起激活:

    $ sudo yum-config-manager --enable docker-ce-edge
    
    $ sudo yum-config-manager --enable docker-ce-test

    可以用以下命令关闭 edge 版本(用 --enable 来激活):

    $ sudo yum-config-manager --disable docker-ce-edge

安装 Docker CE

  1. 安装最新版本:

    $ sudo yum install docker-ce

    如果提示接受 GPG 密钥,验证指纹是否匹配:060A 61C5 1B55 8A7F 742B 77AA C52F EB6B 621E 9F35,如果是,接受。

  2. 要安装指定版本的 Docker CE,在 repo 中列出可用版本,然后选择并安装:

    • 列出 repo 中可用的版本并进行排序。此示例按版本号对结果进行排序,从最高到最低,并截断:

      $ yum list docker-ce --showduplicates | sort -r
      
      docker-ce.x86_64            18.03.0.ce-1.el7.centos             docker-ce-stable
    • 通过其完全限定的包名称安装特定版本,包名称(docker-ce)加上版本字符串(第2列),用连字符分隔:

      $ sudo yum install docker-ce-<VERSION STRING>

      Docker 安装好了,但没有启动,docker 组已经创建,但没有用户添加到组。

  3. 启动 Docker:

    $ sudo systemctl start docker
  4. 通过运行 hello-world 来验证是否正确安装:

    $ sudo docker run hello-world

    此命令下载测试镜像并在容器中运行它。当容器运行时,会打印一条消息并退出。

现在 Docker CE 已经安装好并运行了。

更新

按上面步骤,指定版本,再来一遍。

用 kubeadm 在 CentOS 7 上部署 K8s 集群

本文记录在 Mac 上通过 kubeadm 在 VirtualBox 中的 CentOS 7 部署 K8s 集群的过程。

环境:

  • 本地系统:macOS 10.15.7
  • VirtualBox 版本:6.1.16 r140961
  • CentOS 镜像版本:CentOS-7-x86_64-DVD-2009.iso

1. 安装 CentOS 7

新建虚拟机:

  • 内存分配 2G;
  • 文件位置:Users/***/vm/kube1/kube1.vdi
  • 硬盘分配 20G。

虚拟机设置:

  • CPU 数量:2
  • 网卡模式:选择桥接网卡,混杂模式选全部允许,这样虚拟机可以访问外网,与宿主机也可以互相通信。

安装 CentOS 7:

  • 设置 CentOS 镜像:选择虚拟机 - 设置 - 存储 - 控制器:IDE 盘片 - 分配光驱 - 选择下载的镜像 - OK
  • 在 VB 界面点击启动:
    • 设置时区:Asia/Shanghai
    • SOFTWARE SELECTION:minimal install
    • INSTALLATION DESTINATION:Automatic partitioning
    • NETWORK & HOST NAME:打开 OnBoot,打开后外面显示为 Wired (enp0s3),Host name:kube0.vm(如果没有主机名设置的地方,可以在系统安装完成后用命令 hostnamectl 设置。
    • 安装过程中可以设置 root 用户密码或者添加新用户。

1.1 配置网卡为静态 IP

查看网卡信息:

> ip addr
enp0s3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000

那么在系统中网卡设备名为:ifcfg-enp0s3

修改配置文件:

> vi /etc/sysconfig/network-scripts/ifcfg-enp0s3

TYPE=Ethernet                # 网卡类型:为以太网
PROXY_METHOD=none            # 代理方式:关闭状态
BROWSER_ONLY=no                # 只是浏览器:否
BOOTPROTO=dhcp                # 网卡的引导协议:DHCP[中文名称: 动态主机配置协议]
DEFROUTE=yes                # 默认路由:是, 不明白的可以百度关键词 `默认路由` 
IPV4_FAILURE_FATAL=no        # 是不开启IPV4致命错误检测:否
IPV6INIT=yes                # IPV6是否自动初始化: 是[不会有任何影响, 现在还没用到IPV6]
IPV6_AUTOCONF=yes            # IPV6是否自动配置:是[不会有任何影响, 现在还没用到IPV6]
IPV6_DEFROUTE=yes            # IPV6是否可以为默认路由:是[不会有任何影响, 现在还没用到IPV6]
IPV6_FAILURE_FATAL=no        # 是不开启IPV6致命错误检测:否
IPV6_ADDR_GEN_MODE=stable-privacy            # IPV6地址生成模型:stable-privacy [这只一种生成IPV6的策略]
NAME=ens33                    # 网卡物理设备名称
UUID=f47bde51-fa78-4f79-b68f-d5dd90cfc698    # 通用唯一识别码, 每一个网卡都会有, 不能重复, 否两台linux只有一台网卡可用
DEVICE=ens33                    # 网卡设备名称, 必须和 `NAME` 值一样
ONBOOT=no                        # 是否开机启动, 要想网卡开机就启动或通过 `systemctl restart network`控制网卡,必须设置为 `yes` 

需要修改 BOOTPROTO 和 ONBOOT,并添加三行内容:

TYPE="Ethernet"
PROXY_METHOD="none"
BROWSER_ONLY="no"
BOOTPROTO="static" # 设置网卡引导协议为静态
DEFROUTE="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="enp0s3"
UUID="cde5c905-2d8d-4ca6-965e-30a9a9d5036b"
DEVICE="enp0s3"
ONBOOT="yes" # 设置网卡启动方式为 开机启动 并且可以通过 systemctl 控制网卡
IPADDR=10.0.0.6 # IP 地址
NETMASK=255.255.255.0 # 子网掩码
GATEWAY=10.0.0.1 # 网关地址

重启网卡:

systemctl restart network

再查看 ip addr 会发现修改成功:

enp0s3: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000
    link/ether 08:00:27:17:46:85 brd ff:ff:ff:ff:ff:ff
    inet 10.0.0.6/24 brd 10.0.0.255 scope global noprefixroute enp0s3

2. 安装 Docker

安装过程见 00.CentOS 7 安装 Docker CE

开启

sudo yum install -y yum-utils

sudo yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
   

安装最新版本:

sudo yum install docker-ce docker-ce-cli containerd.io

安装指定版本:

yum list docker-ce --showduplicates | sort -r

sudo yum install docker-ce-<VERSION_STRING> docker-ce-cli-<VERSION_STRING> containerd.io

启动 Docker:

sudo systemctl start docker

3. 安装 Kubernetes 各组件

准备工作:

  • 禁用 SELinux:

    • 临时关闭:命令行执行 setenforce 0
    • 永久关闭:修改 /etc/selinux/config 文件,将 SELINUX=enforcing 改为 SELINUX=disabled
  • 关闭防火墙:systemctl disable firewalld && systemctl stop firewalld

  • 关闭交换分区:swapoff -a && sed -i '/ swap / s/^/#/' /etc/fstab

  • 更改 iptables 设置(后面两台虚拟机也要更改):echo 1 > /proc/sys/net/bridge/bridge-nf-call-iptables

  • /etc/docker/daemon.json 中加入:

    {
        "registry-mirrors": ["https://registry.docker-cn.co"],
        "graph": "/mnt/docker-data",
        "storage-driver": "overlay"
    }
  • 安装 kubectl、kubeadm、kubelet:

    yum -y install kubectl kubeadm kubelet
  • 启动 Docker 和 kubelet(此时可能无法成功启动,在 kubeadm init 后会自动启动):

    systemctl enable docker && systemctl start docker
    systemctl enable kubelet && systemctl start kubelet

4. 复制多台虚拟机

clone 两台虚拟机,并命名为 kube1.vm、kube2.vm:

hostnamectl set-hostname kube1.vm
hostnamectl set-hostname kube2.vm

将三台虚拟机的 host 写入到宿主机和每一台虚拟机:

vi /etc/hosts  

# 追加以下内容(ip 自行替换)
10.0.0.6 kube0.vm
10.0.0.7 kube1.vm
10.0.0.8 kube2.vm

5. 初始化 Master 节点

在 kube0.vm 上执行 kubeadm init

由于你懂的的原因,国内可能需要指定库:

kubeadm init --image-repository registry.aliyuncs.com/google_containers

成功后在 kube0.vm 执行以下命令,以便于使用 kubectl:

# 非 root 用户
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

# root 用户
export KUBECONFIG=/etc/kubernetes/admin.conf

6. 安装网络插件 flannel

执行:

kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml

如果报错:

The connection to the server raw.githubusercontent.com was refused - did you specify the right host or port?

可以把 yaml 文件拷贝到本地:

kubectl apply -f ./kube-flannel.yml

# 执行结果
podsecuritypolicy.policy/psp.flannel.unprivileged created
clusterrole.rbac.authorization.k8s.io/flannel created
clusterrolebinding.rbac.authorization.k8s.io/flannel created
serviceaccount/flannel created
configmap/kube-flannel-cfg created
daemonset.apps/kube-flannel-ds created

此时查看 node 状态应该是 Ready 了:

kubectl get nodes

NAME       STATUS   ROLES                  AGE   VERSION
kube0.vm   Ready    control-plane,master   79m   v1.20.1

7. 把多个 Node 加入集群

kube1.vm 和 kube2.vm 的准备工作:

  • 修改 hosts 文件。
  • 修改主机名。
  • 切换到 root 用户。

使用 kube init 成功执行后输出的信息执行以下命令:

# 使用方法
kubeadm join --token <token> <control-plane-host>:<control-plane-port> --discovery-token-ca-cert-hash sha256:<hash>

kubeadm join 10.0.0.6:6443 --token 2i9xrk.97hiq25bjim8cn00 \
    --discovery-token-ca-cert-hash sha256:7e7a21a363bcc8510b239f9269cc2c0d806941f6e476b731ed7b3cd00405004c

过程中遇到的问题都解决后,可以执行 kubectl get all -A 来查看集群是否安装成功。

8. 安装过程中的 trouble shooting

8.1 Docker 启动失败

启动 Docker 时失败,返回 Job for docker.service failed because the control process exited with error code. See "systemctl status docker.service" and "journalctl -xe" for details.

按指示执行 systemctl status docker.service 中主要内容为:Main PID: 1661 (code=start-limited, status=1/FAILURE)

再执行 journalctl -u docker.service,报错内容显示是 JSON 文件解析错误,检查 /etc/docker/daemon.json 中的内容,确实是少了各逗号,修改 JSON 格式后,正确的格式:

{
    "registry-mirrors": ["https://registry.docker-cn.co"],
    "graph": "/mnt/docker-data",
    "storage-driver": "overlay"
}

再执行 systemctl enable docker && systemctl start docker,启动成功。

8.2 kubelet 启动失败

启动失败后,查看状态:

systemctl status kubelet.service

kubelet.service - kubelet: The Kubernetes Node Agent
   Loaded: loaded (/usr/lib/systemd/system/kubelet.service; enabled; vendor preset: disabled)
  Drop-In: /usr/lib/systemd/system/kubelet.service.d
           └─10-kubeadm.conf
   Active: activating (auto-restart) (Result: exit-code) since Mon 2021-01-04 15:58:22 CST; 9s ago
     Docs: https://kubernetes.io/docs/
  Process: 12965 ExecStart=/usr/bin/kubelet $KUBELET_KUBECONFIG_ARGS $KUBELET_CONFIG_ARGS $KUBELET_KUBEADM_ARGS $KUBELET_EXTRA_ARGS (code=exited, status=255)
 Main PID: 12965 (code=exited, status=255)

Jan 04 15:58:22 kube0.vm systemd[1]: kubelet.service: main process exited, code=exited, status=255/n/a
Jan 04 15:58:22 kube0.vm systemd[1]: Unit kubelet.service entered failed state.
Jan 04 15:58:22 kube0.vm systemd[1]: kubelet.service failed.

搜索得知需要先执行 kubeadm init

8.3 kubeadm init 无法拉取镜像

执行 kubeadm init 后,报错:

error execution phase preflight: [preflight] Some fatal errors occurred:
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/kube-apiserver:v1.20.1: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/kube-controller-manager:v1.20.1: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/kube-scheduler:v1.20.1: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/kube-proxy:v1.20.1: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/pause:3.2: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/etcd:3.4.13-0: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1
	[ERROR ImagePull]: failed to pull image k8s.gcr.io/coredns:1.7.0: output: Error response from daemon: Get https://k8s.gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)
, error: exit status 1

这是无法访问 k8s.gcr.io 导致的,可以通过配置 proxy 或设置 --image-repository 来解决。

配置代理的方式未成功,这里用第二种方式:

[root@kube0 ~]# kubeadm init --image-repository registry.aliyuncs.com/google_containers

[init] Using Kubernetes version: v1.20.1
[preflight] Running pre-flight checks
	[WARNING IsDockerSystemdCheck]: detected "cgroupfs" as the Docker cgroup driver. The recommended driver is "systemd". Please follow the guide at https://kubernetes.io/docs/setup/cri/
[preflight] Pulling images required for setting up a Kubernetes cluster
[preflight] This might take a minute or two, depending on the speed of your internet connection
[preflight] You can also perform this action in beforehand using 'kubeadm config images pull'
[certs] Using certificateDir folder "/etc/kubernetes/pki"
[certs] Generating "ca" certificate and key
[certs] Generating "apiserver" certificate and key
[certs] apiserver serving cert is signed for DNS names [kube0.vm kubernetes kubernetes.default kubernetes.default.svc kubernetes.default.svc.cluster.local] and IPs [10.96.0.1 10.0.0.6]
[certs] Generating "apiserver-kubelet-client" certificate and key
[certs] Generating "front-proxy-ca" certificate and key
[certs] Generating "front-proxy-client" certificate and key
[certs] Generating "etcd/ca" certificate and key
[certs] Generating "etcd/server" certificate and key
[certs] etcd/server serving cert is signed for DNS names [kube0.vm localhost] and IPs [10.0.0.6 127.0.0.1 ::1]
[certs] Generating "etcd/peer" certificate and key
[certs] etcd/peer serving cert is signed for DNS names [kube0.vm localhost] and IPs [10.0.0.6 127.0.0.1 ::1]
[certs] Generating "etcd/healthcheck-client" certificate and key
[certs] Generating "apiserver-etcd-client" certificate and key
[certs] Generating "sa" key and public key
[kubeconfig] Using kubeconfig folder "/etc/kubernetes"
[kubeconfig] Writing "admin.conf" kubeconfig file
[kubeconfig] Writing "kubelet.conf" kubeconfig file
[kubeconfig] Writing "controller-manager.conf" kubeconfig file
[kubeconfig] Writing "scheduler.conf" kubeconfig file
[kubelet-start] Writing kubelet environment file with flags to file "/var/lib/kubelet/kubeadm-flags.env"
[kubelet-start] Writing kubelet configuration to file "/var/lib/kubelet/config.yaml"
[kubelet-start] Starting the kubelet
[control-plane] Using manifest folder "/etc/kubernetes/manifests"
[control-plane] Creating static Pod manifest for "kube-apiserver"
[control-plane] Creating static Pod manifest for "kube-controller-manager"
[control-plane] Creating static Pod manifest for "kube-scheduler"
[etcd] Creating static Pod manifest for local etcd in "/etc/kubernetes/manifests"
[wait-control-plane] Waiting for the kubelet to boot up the control plane as static Pods from directory "/etc/kubernetes/manifests". This can take up to 4m0s
[apiclient] All control plane components are healthy after 15.011343 seconds
[upload-config] Storing the configuration used in ConfigMap "kubeadm-config" in the "kube-system" Namespace
[kubelet] Creating a ConfigMap "kubelet-config-1.20" in namespace kube-system with the configuration for the kubelets in the cluster
[upload-certs] Skipping phase. Please see --upload-certs
[mark-control-plane] Marking the node kube0.vm as control-plane by adding the labels "node-role.kubernetes.io/master=''" and "node-role.kubernetes.io/control-plane='' (deprecated)"
[mark-control-plane] Marking the node kube0.vm as control-plane by adding the taints [node-role.kubernetes.io/master:NoSchedule]
[bootstrap-token] Using token: 2i9xrk.97hiq25bjim8cn00
[bootstrap-token] Configuring bootstrap tokens, cluster-info ConfigMap, RBAC Roles
[bootstrap-token] configured RBAC rules to allow Node Bootstrap tokens to get nodes
[bootstrap-token] configured RBAC rules to allow Node Bootstrap tokens to post CSRs in order for nodes to get long term certificate credentials
[bootstrap-token] configured RBAC rules to allow the csrapprover controller automatically approve CSRs from a Node Bootstrap Token
[bootstrap-token] configured RBAC rules to allow certificate rotation for all node client certificates in the cluster
[bootstrap-token] Creating the "cluster-info" ConfigMap in the "kube-public" namespace
[kubelet-finalize] Updating "/etc/kubernetes/kubelet.conf" to point to a rotatable kubelet client certificate and key
[addons] Applied essential addon: CoreDNS
[addons] Applied essential addon: kube-proxy

Your Kubernetes control-plane has initialized successfully!

To start using your cluster, you need to run the following as a regular user:

  mkdir -p $HOME/.kube
  sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
  sudo chown $(id -u):$(id -g) $HOME/.kube/config

Alternatively, if you are the root user, you can run:

  export KUBECONFIG=/etc/kubernetes/admin.conf

You should now deploy a pod network to the cluster.
Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:
  https://kubernetes.io/docs/concepts/cluster-administration/addons/

Then you can join any number of worker nodes by running the following on each as root:

kubeadm join 10.0.0.6:6443 --token 2i9xrk.97hiq25bjim8cn00 \
    --discovery-token-ca-cert-hash sha256:7e7a21a363bcc8510b239f9269cc2c0d806941f6e476b731ed7b3cd00405004c

8.4 安装 flannel 后报错

执行 kubectl get all -A 时发现 Pod kube-flannel-ds-ktm4q 状态是 CrashLoopBackOff。

查看该 Pod 日志:

kubectl logs kube-flannel-ds-ktm4q -n kube-system

# 错误日志
ERROR: logging before flag.Parse: I0104 16:51:09.442652       1 main.go:519] Determining IP address of default interface
ERROR: logging before flag.Parse: I0104 16:51:09.443306       1 main.go:532] Using interface with name enp0s3 and address 10.0.0.6
ERROR: logging before flag.Parse: I0104 16:51:09.443322       1 main.go:549] Defaulting external address to interface address (10.0.0.6)
W0104 16:51:09.443343       1 client_config.go:608] Neither --kubeconfig nor --master was specified.  Using the inClusterConfig.  This might not work.
ERROR: logging before flag.Parse: I0104 16:51:09.635774       1 kube.go:116] Waiting 10m0s for node controller to sync
ERROR: logging before flag.Parse: I0104 16:51:09.636082       1 kube.go:299] Starting kube subnet manager
ERROR: logging before flag.Parse: I0104 16:51:10.636292       1 kube.go:123] Node controller sync successful
ERROR: logging before flag.Parse: I0104 16:51:10.636327       1 main.go:253] Created subnet manager: Kubernetes Subnet Manager - kube0.vm
ERROR: logging before flag.Parse: I0104 16:51:10.636333       1 main.go:256] Installing signal handlers
ERROR: logging before flag.Parse: I0104 16:51:10.636398       1 main.go:391] Found network config - Backend type: vxlan
ERROR: logging before flag.Parse: I0104 16:51:10.636476       1 vxlan.go:123] VXLAN config: VNI=1 Port=0 GBP=false Learning=false DirectRouting=false
ERROR: logging before flag.Parse: E0104 16:51:10.636817       1 main.go:292] Error registering network: failed to acquire lease: node "kube0.vm" pod cidr not assigned

解决方法:编辑 /etc/kubernetes/manifests/kube-controller-manager.yaml,在 spec -> containers -> -command 下加入两行:

- --allocate-node-cidrs=true
- --cluster-cidr=10.244.0.0/16

然后删除 kube-controller-manager ,它会自动重启,配置就生效了。

8.5 Node 加入集群时报错

执行以下命令加入集群时报错:

kubeadm join 10.0.0.6:6443 --token 2i9xrk.97hiq25bjim8cn00 \
    --discovery-token-ca-cert-hash sha256:7e7a21a363bcc8510b239f9269cc2c0d806941f6e476b731ed7b3cd00405004c
    
# 错误信息
error execution phase preflight: [preflight] Some fatal errors occurred:
	[ERROR FileContent--proc-sys-net-bridge-bridge-nf-call-iptables]: /proc/sys/net/bridge/bridge-nf-call-iptables contents are not set to 1

这是忘记执行这条命令了:

echo 1 > /proc/sys/net/bridge/bridge-nf-call-iptables

8.6 Node 加入集群后,使用 kubectl 命令时报错

Node 在加入集群后,执行 kubectl 命令时会报错:

The connection to the server localhost:8080 was refused - did you specify the right host or port?

之前在 Master 节点时也有这个报错,随后以下命令后就可以使用了:

export KUBECONFIG=/etc/kubernetes/admin.conf

但是在 Worker 节点上执行此命令后,还是报错:

/etc/kubernetes/admin.conf: No such file or directory

这是因为 Worker 节点上没有配置文件,记得取消前面设置的环境变量:

unset KUBECONFIG

然后执行:

mkdir -p $HOME/.kube/
scp root@matet:/etc/kubernetes/admin.conf   $HOME/.kube/config

此时再执行 kubectl 命令就不会报错了。

8.7 集群中的工作节点没有 Role

执行命令:

kubectl get nodes

NAME       STATUS   ROLES                  AGE    VERSION
kube0.vm   Ready    control-plane,master   157m   v1.20.1
kube1.vm   Ready    <none>                 47m    v1.20.1
kube2.vm   Ready    <none>                 42m    v1.20.1

工作节点的 ROLES 为 none,不知道是默认这样还是有什么原因?

ROLES 只是节点的 label,和节点亲和性有关,比如 Pod 不会调度到 master 节点。参考:Assign Pods to Nodes using Node Affinity

添加 ROLE:

kubectl label node <node name> node-role.kubernetes.io/<role name>=<key - (any name)>

删除 ROLE:

kubectl label node <node name> node-role.kubernetes.io/<role name>-

9. 参考

Creating a cluster with kubeadm

在 CentOS 7 环境下用 Docker 运行 Redis 服务

这里简单介绍使用 Docker Compose 来运行 Redis 服务,并通过挂载卷进行数据持久化。关于 Docker Compose,建议看官方文档

安装 Docker Compose

这一部分的前提是已经安装好了 Docker。

先安装 EPEL 库:

yum install epel-release

然后安装 python-pip:

yum install -y python-pip

使用 pip 安装 Docker Compose:

pip install docker-compose

升级 CentOS 7 上的所有 Python 包:

yum upgrade python*

检查 Docker Compose 版本,验证是否成功安装:

docker-compose -v

如果打印出了如下信息,说明完成安装:

docker'compose version 1.16.1, build 6d1ac219

Docker Compose 文件

在 docker-compose.yml 文件中定义容器如何运行:

version: '3'
services:
  Redis:
    image: "redis:5.0-rc"
    container_name: redis
    command: /usr/local/etc/redis/redis.conf
    volumes:
      - ./redis/redis.conf:/usr/local/etc/redis/redis.conf
      - ./redis/data:/data
    ports:
      - "127.0.0.1:6379:6379"
    privileged: true
    restart: always

对以上内容简单说明:

  • version:Compose 文件格式版本号。

  • services:要构建的服务名称,这里是 Redis,即生产环境中的容器。服务只运行一个镜像,它规定镜像运行的方式 - 应该使用哪些端口,应该运行多少个容器副本,等等。

  • image:指定从哪个镜像构建此容器,这里使用的是官方镜像。

  • container_name:容器名称。

  • command:容器启动后执行的命令。这里是指定配置文件,虽然官方镜像的 dockerfile 里面已经指定了 CMDredis-server,这里指定配置文件位置的命令会覆盖 CMD 的命令。

    看了下 dockerfile 中指定的 docker-entrypoint.sh 文件,里面运行了 redis-server 命令,command 标签后面的命令会追加到此命令。最终效果就是执行了:redis-server /usr/local/etc/redis/redis.conf 命令。

  • volumes:

    • 第一行是将宿主机上指定的配置文件挂载到 redis 默认的配置文件上以覆盖它。

    • 第二行是将容器中的目录映射到指定的宿主机目录,以做数据持久化。

  • ports:将容器的 6379 端口映射到宿主机对应端口,以便容器外部访问。

  • restart:容器总是会重启。

运行容器

执行命令 docker-compose up -d,运行 redis 容器。

由于之前没有看 dockerfile 的内容,是按照 macOS 中的目录 /usr/local/var/db/redis 挂载的持久卷,执行此命令后,容器总是重启,查看日志发现以下信息:

[offset 0] Unexpected EOF reading RDB file
[additional info] While doing: start
[additional info] Reading type 0 (string)
[info] 0 keys read
[info] 0 expires
[info] 0 already expired
1:C 14 Aug 2018 04:22:25.052 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
1:C 14 Aug 2018 04:22:25.052 # Redis version=4.9.104, bits=64, commit=00000000, modified=0, pid=1, just started
1:C 14 Aug 2018 04:22:25.052 # Configuration loaded
1:M 14 Aug 2018 04:22:25.053 * Running mode=standalone, port=6379.
1:M 14 Aug 2018 04:22:25.053 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
1:M 14 Aug 2018 04:22:25.053 # Server initialized
1:M 14 Aug 2018 04:22:25.053 # Short read or OOM loading DB. Unrecoverable error, aborting now.
1:M 14 Aug 2018 04:22:25.053 # Internal error in RDB reading function at rdb.c:2055 -> Unexpected EOF reading RDB file
[offset 0] Checking RDB file dump.rdb

后来查看 dockerfile 文件,其中 VOLUMEWORKDIR 指定的目录是 /data。修改后,停止并删除容器,删除宿主机 ./redis/data 目录中的内容,再重新运行容器,正常工作。

Go 并发模式:Context

原文地址

介绍

在 Go 的服务器中,每一个到来的请求都在其各自的 goroutine 中进行处理。请求处理程序通常启动额外的 goroutine 来访问后端,比如在访问数据库和 RPC 服务时。工作在一个请求上的 goroutine 集合通常需要用到请求中的特定值,比如最终用户的身份,授权令牌和请求的截止期限。当请求撤消或超时,工作在该请求上的所有 goroutine 应迅速退出,这样系统就能回收它们使用的资源。

Google 开发了一个 context 包,可以很容易的将请求域的值,取消信号,和跨 API 边界的截止期限传递给处理请求所涉及的所有 goroutine。本文介绍如何使用该包,并提供了完整的工作示例。

Context

context 包的核心是 Context 类型:

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this Context is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}

Done 方法给代表 Context 运行的函数返回一个作为取消信号的 channel:当通道关闭时,该函数应放弃它们的作业并返回。Err 方法返回一个 error,指示 Context 被取消的原因。

Context 不具有 Cancel 方法,因为同一原因,Done 通道只用于接收:接收取消信号的函数通常不是发送该信号的那一个。特别是,当父操作为子操作启动 goroutine时,这些子操作不应该能够取消父操作。作为替代,WithCancel 函数提供取消新 Context 值的方法。

Context 被多个 goroutine 同时使用是安全地。代码中可以将一个 Context 传递给任意数量的 goroutine,并且可以通过退出该 Context 来给所有使用它的 goroutine 发出退出信号。

Deadline 方法允许函数来决定它们是否应该开始工作;如果剩下的时间太短,这时开始工作可能不太划算。代码也可以使用一个 deadline 来为 I/O 操作设置超时。

Value 允许 Context 携带请求域的数据。该数据在由多个 goroutine 同时使用时必须是安全的。

衍生的 context

context 包提供了从现有值衍生出新的 Context 值的函数。这些值形成树状结构:当一个 Context 取消时,从它衍生出的所有 Context 也将被取消。

Background 是任何 Context 树的根,它永远不会退出:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level Context for incoming requests.
func Background() Context

WithCancelWithTimeout 返回衍生出的 Context 值,它们可以比父 Context 更早取消。与传入的请求相关联的 Context 通常在请求处理程序返回时被取消。在使用多个副本时,WithCancel 对于取消多余的请求很有用。需要给发往后端服务的请求设置 deadline 时 WithCancel 很有用。

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue 提供了一种将请求域值与 Context 相关联的方法:

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

工作实例是了解如何使用 context 包的最好方式。

实例:谷歌网络搜索

这个例子是一个 HTTP 服务器,它通过将查询 golang 转发到 Google Web Search API 并呈现结果,来处理类似 /search?q=golang&timeout=1s 这样的 URL 。timeout 参数告诉服务器在经过这段时间之后取消该请求。

代码被拆分为三个包:

  • server 提供 main 函数和 /search 的处理程序。
  • userip 提供从请求中提取用户 IP 地址并将其与 Context 相关联的函数。
  • google 提供了向 Google 发送查询的 Search 函数。

server 程序

server 程序通过提供 “golang” 的前几个 Google 搜索结果来处理类似 /search?q=golang 这样的请求。它注册 handleSearch 来处理 /search 端点。处理程序创建了一个叫做 ctx 的初始化 Context ,并安排在处理程序返回时取消它。如果请求包含 timeout URL 参数,则在超时后 Context 自动退出。

func handleSearch(w http.ResponseWriter, req *http.Request) {
    // ctx is the Context for this handler. Calling cancel closes the
    // ctx.Done channel, which is the cancellation signal for requests
    // started by this handler.
    var (
        ctx    context.Context
        cancel context.CancelFunc
    )
    timeout, err := time.ParseDuration(req.FormValue("timeout"))
    if err == nil {
        // The request has a timeout, so create a context that is
        // canceled automatically when the timeout expires.
        ctx, cancel = context.WithTimeout(context.Background(), timeout)
    } else {
        ctx, cancel = context.WithCancel(context.Background())
    }
    defer cancel() // Cancel ctx as soon as handleSearch returns.

处理程序从请求中提取查询,并且通过调用 userip 包来提取客户端的 IP 地址。客户端的 IP 地址是后端请求所需要的,因此 handleSearch 将其附加到 ctx 上:

    // Check the search query.
    query := req.FormValue("q")
    if query == "" {
        http.Error(w, "no query", http.StatusBadRequest)
        return
    }

    // Store the user IP in ctx for use by code in other packages.
    userIP, err := userip.FromRequest(req)
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
        return
    }
    ctx = userip.NewContext(ctx, userIP)

处理程序使用 ctxquery 调用 google.Search

    // Run the Google search and print the results.
    start := time.Now()
    results, err := google.Search(ctx, query)
    elapsed := time.Since(start)

如果搜索成功,处理程序呈现结果:

    if err := resultsTemplate.Execute(w, struct {
        Results          google.Results
        Timeout, Elapsed time.Duration
    }{
        Results: results,
        Timeout: timeout,
        Elapsed: elapsed,
    }); err != nil {
        log.Print(err)
        return
    }

userip 包

userip 提供从请求中提取用户 IP 地址并将其与 Context 相关联的函数。一个 Context 提供一个键-值映射,键和值都是 interface{} 类型。键类型必须可比较,并且值供多个 goroutine 同时使用时必须是安全的。类似 userip 这样的包隐藏了此映射的细节,并且提供对特定 Context 值的强类型访问。

为了避免键冲突,userip 定义一个非输出类型的 key ,并且使用此类型的值作为 context 键:

// The key type is unexported to prevent collisions with context keys defined in
// other packages.
type key int

// userIPkey is the context key for the user IP address.  Its value of zero is
// arbitrary.  If this package defined other context keys, they would have
// different integer values.
const userIPKey key = 0

FromRequesthttp.Request 提取 userIP 值:

func FromRequest(req *http.Request) (net.IP, error) {
    ip, _, err := net.SplitHostPort(req.RemoteAddr)
    if err != nil {
        return nil, fmt.Errorf("userip: %q is not IP:port", req.RemoteAddr)
    }

NewContext 返回一个新的包含提供的 userIP 值的 Context

func NewContext(ctx context.Context, userIP net.IP) context.Context {
    return context.WithValue(ctx, userIPKey, userIP)
}

FromContextContext 提取 userIP

func FromContext(ctx context.Context) (net.IP, bool) {
    // ctx.Value returns nil if ctx has no value for the key;
    // the net.IP type assertion returns ok=false for nil.
    userIP, ok := ctx.Value(userIPKey).(net.IP)
    return userIP, ok
}

google 包

google.Search 函数生成一个到 Google Web Search API 的请求,并且解析 JSON 编码的结果。它接收 Context 参数 ctx ,并且如果 ctx.Done 在请求正在进行时关闭,则立即返回。

Google 网络搜索 API 请求包含搜索查询和用户 IP 作为查询参数:

func Search(ctx context.Context, query string) (Results, error) {
    // Prepare the Google Search API request.
    req, err := http.NewRequest("GET", "https://ajax.googleapis.com/ajax/services/search/web?v=1.0", nil)
    if err != nil {
        return nil, err
    }
    q := req.URL.Query()
    q.Set("q", query)

    // If ctx is carrying the user IP address, forward it to the server.
    // Google APIs use the user IP to distinguish server-initiated requests
    // from end-user requests.
    if userIP, ok := userip.FromContext(ctx); ok {
        q.Set("userip", userIP.String())
    }
    req.URL.RawQuery = q.Encode()

Search 使用辅助函数 httpDo 发出 HTTP 请求,并且如果 ctx.Done 在处理请求或响应时关闭,则将其退出。SearchhttpDo 传递一个闭包来处理 HTTP 响应:

    var results Results
    err = httpDo(ctx, req, func(resp *http.Response, err error) error {
        if err != nil {
            return err
        }
        defer resp.Body.Close()

        // Parse the JSON search result.
        // https://developers.google.com/web-search/docs/#fonje
        var data struct {
            ResponseData struct {
                Results []struct {
                    TitleNoFormatting string
                    URL               string
                }
            }
        }
        if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
            return err
        }
        for _, res := range data.ResponseData.Results {
            results = append(results, Result{Title: res.TitleNoFormatting, URL: res.URL})
        }
        return nil
    })
    // httpDo waits for the closure we provided to return, so it's safe to
    // read results here.
    return results, err

httpDo 函数运行 HTTP 请求并在一个新的 goroutine 中处理它的应答。如果 ctx.Done 在 goroutine 退出之前关闭,它将取消请求:

func httpDo(ctx context.Context, req *http.Request, f func(*http.Response, error) error) error {
    // Run the HTTP request in a goroutine and pass the response to f.
    tr := &http.Transport{}
    client := &http.Client{Transport: tr}
    c := make(chan error, 1)
    go func() { c <- f(client.Do(req)) }()
    select {
    case <-ctx.Done():
        tr.CancelRequest(req)
        <-c // Wait for f to return.
        return ctx.Err()
    case err := <-c:
        return err
    }
}

为 Contexts 适应代码

许多服务器框架为承载请求域的值提供包和类型。我们可以定义 Context 接口的新实现,来对使用现有框架的代码和期待一个 Context 参数的代码进行桥接。

例如, Gorilla 的 github.com/gorilla/context 包运行处理程序通过提供一个从 HTTP 请求到键值对的映射来将数据与传入请求想关联。在 gorilla.go 中,我们提供了一个 Context 的实现,它的 value 方法返回与 Gorilla 中一个特定 HTTP 请求相关联的值。

其它包提供了与 Context 类似的取消支持。例如,Tomb 提供一个 Kill 方法,它通过关闭一个 Dying channel 来表示取消。Tomb 还提供了等待那些 goroutine 退出的方法,类似于 sync.WaitGroup。在 tomb.go 中,我们提供了一个 Context 的实现,当它的父 Context 退出或一个提供的 Tomb 被杀掉时,它也将退出。

总结

在 Google,我们要求 Go 程序员在输入和输出请求之间的调用路径上的每个函数中,将 Context 作为第一个参数来传递。这让不同团队之间的 Go 代码开发有良好的互操作性。它提供了对超时、退出还有确保对像安全凭据这样的临界值正确地传输 Go 程序的简单控制。

想基于 Context 建立的服务器框架应该提供 Context 的实现来对它们的包与那些期待 Context 参数的代码进行桥接。它们的客户端库随后将从调用代码中接收一个 Context。通过为请求域值和退出信号建立一个通用接口,Context 使包的开发者更容易共享用于创建可伸缩服务的代码。

使 GitHub 与 GitLab 在同一台电脑共存的配置

有的项目用 GitLab,有的项目用 GitHub 时,需要配置一下。

1. 创建各自的私钥/公钥

例如创建 GitLab 的私钥/公钥,用以下命令:

ssh-keygen -t rsa -f ~/.ssh/id_rsa.gitlab -C "[email protected]"

2. 配置 config 文件

~/.ssh 目录下创建 config 文件:

touch config

配置:

Host *.example.com
IdentityFile ~/.ssh/id_rsa.gitlab
User username

# Personal GitHub
Host github.com
IdentityFile ~/.ssh/id_rsa.github
User username

3. 上传至各自网站

复制公钥:

pbcopy < ./id_rsa.gitlab.pub

上传至各自网站 SSH Key 处。

4. 验证

通过以下命令:

出现 Welcome to GitLab 类似结果,即成功。

5. Git 配置用户名及邮箱

5.0 删除全局配置

如果之前配置过全局用户名及邮箱,需要删除:

git config --global --unset user.name
git config --global --unset user.email

如果不知道是否配置过,通过下面命令查看:

git config --global user.name
git config --global user.email

5.1 配置用户名及邮箱

进入项目的根目录,执行以下命令:

git config user.name "yourUsername"
git config user.email "[email protected]"

这样配置后,提交时显示的就是各项目指定的用户名和邮箱了。

Go 中的 err shadowing 问题

今天编译代码时遇到了这个问题,记录一下。

在一个函数中,给返回的 error 定义了名称。这个函数定义类似下面这样:

func foo(a string) (err error) {
    res, err := bar(a)
    if err != nil {
        return
    }
    
    // do anything else
    
    return
}

编译时会报这样的错误:

../../../../server/init.go:455:4: err is shadowed during return

这个问题很容易被忽略,返回的错误意思是函数在 return 时,err 被覆盖了,被谁覆盖了?

本来在调用 bar 函数发生错误时,应该返回的是当前发生的错误,但由于返回值中定义了错误名称,所以返回的是外面函数返回值中定义的 err,而不是 if err != nil 里这个 err,这样应当返回的错误就被外面那个 err 给覆盖了。

如果还要用这种定义返回值名称的方式,可以显式的返回 if 中的错误:

func foo(a string) (err error) {
    res, err := bar(a)
    if err != nil {
        return err
    }
    
    // do anything else
    
    return
}

不过这种写法还是没有在函数内部显式的声明变量来的直观:

func foo(a string) error {
    res, err := bar(a)
    if err != nil {
        return err
    }
    
    // do anything else
    
    return nil
}

Tendermint ABCI 应用 KVStore 源码详解

Tendermint abci 项目主页

这篇文章以 ABCI 示例 KVStore 应用及默认的 socket 连接为例说明 ABCI 应用的启动及 abci-cli 客户端与其交互的过程,以加深开发 ABCI 应用的模式及源码组织方式的理解。

整体流程说明

  1. ABCI 应用服务端:在命令行执行 abci-cli kvstore 启动应用后,它会在 46658 端口等待客户端的 TCP 连接。
  2. abci-cli 客户端:这个客户端指的是 echoinfodeliverTx 等子命令。执行这些命令会建立一条与 ABCI 应用服务端的 TCP 连接,并将子命令后面的参数当作请求消息发送给应用服务端进行处理(这里的 ABCI 应用与 Tendermint 节点绑定在一起)。

构建命令过程

程序入口在 abci/cmd/abci-cli/main.goExecute 函数。

这里面做事情有:

  • 构建 RootCmd 命令,即 abci-cli 命令及各子命令。

  • 注册全局 Flags,主要包括:

    • ABCI 应用服务端监听地址 flagAddress ,默认为:tcp://0.0.0.0:46658
    • abci-cli 客户端与 ABCI 应用服务端的通信协议 flagAbci,默认为:socket
    • 日志等级 flagLogLevel,默认为:debug
  • 添加 ABCI 应用 kvstoredummy 以及 echoCmdinfoCmddeliverTxCmdcommitCmd 等客户端命令。

RootCmd、kvstore 命令的实现及启动

RootCmd 主命令逻辑

所有子命令都要添加到 RootCmd 主命令下面。

执行 abci-cli 命令只会列出其使用文档,在执行具体子命令时才会执行其定义的相应应用逻辑。

这里只需看这段代码:

// 执行主命令时如果 client 为空,会创建 client 并启动,会根据 flagAbci 参数来判断要创建 socket 客户端
// 还是 RPC 客户端
if client == nil {
	var err error
	client, err = abcicli.NewClient(flagAddress, flagAbci, false)
	if err != nil {
		return err
	}
	client.SetLogger(logger.With("module", "abci-client"))

	// 启动客户端,实际执行的是 client.OnStart() 函数
	if err := client.Start(); err != nil {
		return err
	}
}

abci-cli 客户端

在命令行执行 abci-cli echo hello,会与 ABCI 应用服务端建立一条 TCP 连接并将 “abc” 发送到服务端进行处理,收到应后断开连接。但这样比较麻烦,可以使用 abci-cli console 命令可以在交互式命令行中与应用服务端交互。

通过调用 abcicli.NewClient 函数来创建客户端。

返回的是接口 abci/client/Client,这个接口有 socketgrpclocal 三种实现,但 flag 只可以指定前两种,默认为 socket

func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
	switch transport {
	case "socket":
		client = NewSocketClient(addr, mustConnect)
	case "grpc":
		client = NewGRPCClient(addr, mustConnect)
	default:
		err = fmt.Errorf("Unknown abci transport %s", transport)
	}
	return
}

abci/client/Client 接口继承了 tmlibs/common/service/Service 接口,可以启动、停止和重置。客户端和服务端都需要这些功能,使用时可以通过把 BaseService 作为自定义结构的匿名字段来实现。

首先看 socketClient 函数的结构,它实现了 abci/client/Client 接口,由于 cmn.BaseService 结构是它的匿名字段,也间接实现了 tmlibs/common/service/Service 接口:

type socketClient struct {
    // 这个结构实现了 Service 接口,内部包含 impl Service字段,即具体实现 Service 的结构
	cmn.BaseService

    // 用来传递请求及应答消息的通道,
	reqQueue    chan *ReqRes
	flushTimer  *cmn.ThrottleTimer
	mustConnect bool

	mtx     sync.Mutex
	addr    string
	conn    net.Conn
	err     error
    
    // 标准库中的双向链表,从 reqQueue 读取到的请求消息都会先放入这个链表的尾端
	reqSent *list.List
	resCb   func(*types.Request, *types.Response) // listens to all callbacks
}

创建 socketClient 的函数:

func NewSocketClient(addr string, mustConnect bool) *socketClient {
	cli := &socketClient{
		reqQueue:    make(chan *ReqRes, reqQueueSize),
		flushTimer:  cmn.NewThrottleTimer("socketClient", flushThrottleMS),
		mustConnect: mustConnect,

		addr:    addr,
		reqSent: list.New(),
		resCb:   nil,
	}
    
    // 这里传入了具体实现 Service 的结构 cli
	cli.BaseService = *cmn.NewBaseService(nil, "socketClient", cli)
	return cli
}

在命令行执行 abci-cli kvstore 命令时会执行 client.Start() 启动服务,这里用默认的 socket 连接及 kvstore 应用举例说明。这里执行的是 socketClient 结构中匿名字段 cmn.BaseService 的方法:

func (bs *BaseService) Start() error {
	if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
		if atomic.LoadUint32(&bs.stopped) == 1 {
			bs.Logger.Error(Fmt("Not starting %v -- already stopped", bs.name), "impl", bs.impl)
			return ErrAlreadyStopped
		}
		bs.Logger.Info(Fmt("Starting %v", bs.name), "impl", bs.impl)
        
        // 这里实际执行上面传入的 cli(即 socketClient 结构) 的 OnStart 函数来启动服务
		err := bs.impl.OnStart()
		if err != nil {
			// revert flag
			atomic.StoreUint32(&bs.started, 0)
			return err
		}
		return nil
	}
	bs.Logger.Debug(Fmt("Not starting %v -- already started", bs.name), "impl", bs.impl)
	return ErrAlreadyStarted
}

以上就是客户端的启动过程。

ABCI 应用服务端

现在看一下执行 abci-cli kvstore 命令都做了什么。

执行此命令时,实际执行的是 cmdKVStore 函数,启动了应用服务端,在 tcp://0.0.0.0:46658 监听连接。

启动服务端:

// 默认创建 socket 服务端
srv, err := server.NewServer(flagAddrD, flagAbci, app)
	if err != nil {
		return err
	}
	srv.SetLogger(logger.With("module", "abci-server"))
	if err := srv.Start(); err != nil {
		return err
	}

NewSocketServer 创建服务端:

func NewSocketServer(protoAddr string, app types.Application) cmn.Service {
	proto, addr := cmn.ProtocolAndAddress(protoAddr)
	s := &SocketServer{
		proto:    proto,
		addr:     addr,
		listener: nil,
		app:      app,
		conns:    make(map[int]net.Conn),
	}
    // 这里使用的模式与 Client 段一致
	s.BaseService = *cmn.NewBaseService(nil, "ABCIServer", s)
	return s
}

执行 srv.Start() 函数时,实际执行的是 SocketServer 的实现,通过 BaseService 结构的 Start 方法调用:

func (s *SocketServer) OnStart() error {
	if err := s.BaseService.OnStart(); err != nil {
		return err
	}
	ln, err := net.Listen(s.proto, s.addr)
	if err != nil {
		return err
	}
	s.listener = ln
    // 启动一个协程来监听连接
	go s.acceptConnectionsRoutine()
	return nil
}

至此已经把 ABCI 应用服务端是如何启动的说明了,下面的部分会详细说明请求及应答处理的细节。

请求及应答处理

这部分以 deliver_tx 命令为例来进行说明。

abci-cli 客户端

为了方便,这里再看一下 socketClient 的数据结构:

type socketClient struct {
	cmn.BaseService

	reqQueue    chan *ReqRes
	flushTimer  *cmn.ThrottleTimer
	mustConnect bool

	mtx     sync.Mutex
	addr    string
	conn    net.Conn
	err     error
    
    // 这里会把请求写入双向链表 reqSent 的尾端,在 recvResponseRoutine 函数中接收到应答时会从此链表取出
    // 第一个请求进行类型比较,如果与应答类型一样则返回给前端
	reqSent *list.List
	resCb   func(*types.Request, *types.Response) // listens to all callbacks

}

OnStart 函数所做的就是与服务端建立连接,启动两个协程来处理请求与应答。

先看处理请求的函数 sendRequestsRoutine

func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
	w := bufio.NewWriter(conn)
	for {
		select {
            // 发送 flush 类型请求的定时器
		case <-cli.flushTimer.Ch:
			select {
			case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
			default:
				// Probably will fill the buffer, or retry later.
			}
		case <-cli.Quit():
			return
		case reqres := <-cli.reqQueue:
             // 这里会把请求写入双向链表 reqSent 的尾端
			cli.willSendReq(reqres)
             // 把请求消息写入连接缓冲,这时还没有发送给服务端
			err := types.WriteMessage(reqres.Request, w)
			if err != nil {
				cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
				return
			}
             // 如果请求是 flush 类型,会把缓冲的请求消息 (包括此 flush 请求) 写入连接,
             // 由 kvstore 服务端接收并处理。
             // 有两种方式发送 flush 类型的请求:1) 定时器触发;2) DeliverTxSync 函数中主动发送
			if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
				err = w.Flush()
				if err != nil {
					cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
					return
				}
			}
		}
	}
}

现在看处理应答的 recvResponseRoutine 函数:

func (cli *socketClient) recvResponseRoutine(conn net.Conn) {

	r := bufio.NewReader(conn) // Buffer reads
	for {
		var res = &types.Response{}
        // 从连接中读取应答,出错时会关闭连接并执行 flushQueue() 释放 wg.WaitGroup
		err := types.ReadMessage(r, res)
		if err != nil {
			cli.StopForError(err)
			return
		}
		switch r := res.Value.(type) {
		case *types.Response_Exception:
			cli.StopForError(errors.New(r.Exception.Error))
			return
		default:
             // 应答处理逻辑在这里
			err := cli.didRecvResponse(res)
			if err != nil {
				cli.StopForError(err)
				return
			}
		}
	}
}

func (cli *socketClient) didRecvResponse(res *types.Response) error {
	cli.mtx.Lock()
	defer cli.mtx.Unlock()

    // 从双向链表 reqSent 中取出第一个请求
	next := cli.reqSent.Front()
	if next == nil {
		return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
	}
	reqres := next.Value.(*ReqRes)
     // 检查请求与应答的类型是否匹配
	if !resMatchesReq(reqres.Request, res) {
		return fmt.Errorf("Unexpected result type %v when response to %v expected",
			reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
	}

	reqres.Response = res    // Set response
    reqres.Done()            // 释放此请求创建时执行的 wg.Add(1)
	cli.reqSent.Remove(next) // 从链表中删除第一个请求

	// Notify reqRes listener if set
	if cb := reqres.GetCallback(); cb != nil {
		cb(res)
	}

	// Notify client listener if set
	if cli.resCb != nil {
		cli.resCb(reqres.Request, res)
	}

	return nil
}

ABCI 应用服务端

创建应用细节

先看应用的数据结构 KVStoreApplication

// 这个结构只实现了 Info、DeliverTx、CheckTx、Commit 和 Query 方法
type KVStoreApplication struct {
    // 这个基础结构实现了 "tendermint/abci/types/Application" 接口(此项目中基本都是用的这种模式)。
    // 这个结构实现的接口的方法中没有具体应用逻辑,以供开发者在自己的应用结构中继承此结构后,可以只实现
    // 必须的方法,而无需实现接口的全部方法
	types.BaseApplication

	state State
}

创建应用:

func NewKVStoreApplication() *KVStoreApplication {
	state := loadState(dbm.NewMemDB())
	return &KVStoreApplication{state: state}
}

主要看 loadState 函数,它根据键 stateKey 从内存存储 MemDB 结构中获取对应状态,因为是初始化,肯定没有对应值,返回的是一个带有新建 MemDB (就是一个带锁的 map)的 State

func loadState(db dbm.DB) State {
	stateBytes := db.Get(stateKey)
	var state State
	if len(stateBytes) != 0 {
		err := json.Unmarshal(stateBytes, &state)
		if err != nil {
			panic(err)
		}
	}
	state.db = db
	return state
}

服务端处理请求及应答细节

重点看接受连接的函数:

func (s *SocketServer) acceptConnectionsRoutine() {
	for {
		// 接受连接,下面这些日志就是命令行启动 kvstore 后看到的信息
		s.Logger.Info("Waiting for new connection...")
		conn, err := s.listener.Accept()
		if err != nil {
			if !s.IsRunning() {
				return // Ignore error from listener closing.
			}
			s.Logger.Error("Failed to accept connection: " + err.Error())
			continue
		}

		s.Logger.Info("Accepted a new connection")

        // 可以接受多条连接并记录
		connID := s.addConn(conn)

		closeConn := make(chan error, 2)              // Push to signal connection closed
		responses := make(chan *types.Response, 1000) // A channel to buffer responses

         // 从连接读取请求并处理
		go s.handleRequests(closeConn, conn, responses)
		
         // 从 'responses' 获取应答并写到连接中
		go s.handleResponses(closeConn, conn, responses)

		// 等待信号来关闭连接
		go s.waitForClose(closeConn, connID)
	}
}

先看处理请求的 handleRequests 函数:

func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
	var count int
	var bufReader = bufio.NewReader(conn)
    
	for {
		var req = &types.Request{}
        // 从连接上读取请求消息,读取完毕或出错后要通知 waitForClose 协程来关闭连接
		err := types.ReadMessage(bufReader, req)
		if err != nil {
			if err == io.EOF {
				closeConn <- err
			} else {
				closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
			}
			return
		}
		s.appMtx.Lock()
		count++
        // 处理请求时要加锁,这个函数会根据请求的类型调用具体函数来处理,
        // 比如 types.Request_DeliverTx 类型时就会调用 KVStoreApplication.DeliverTx 函数来处理,
        // 应答会写入 responses 通道,以便 handleResponses 函数处理
		s.handleRequest(req, responses)
		s.appMtx.Unlock()
	}
}

现在看 handleResponses 函数:

func (s *SocketServer) handleResponses(closeConn chan error, conn net.Conn, responses <-chan *types.Response) {
	var count int
	var bufWriter = bufio.NewWriter(conn)
	for {
        // 从 responses 通道读取应答并写入连接。同样,出错时要通知 waitForClose 来关闭连接
		var res = <-responses
		err := types.WriteMessage(res, bufWriter)
		if err != nil {
			closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
			return
		}
        
         // flush 类型的应答是哪里来的?
         // 与客户端处理类似,如果是此类型要进行 Flush 处理,把缓冲的数据写入连接
		if _, ok := res.Value.(*types.Response_Flush); ok {
			err = bufWriter.Flush()
			if err != nil {
				closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
				return
			}
		}
		count++
	}
}

deliver_tx 子命令执行过程

现在以此命令为例说明发起请求及收到应答的整体过程。

  1. 服务端启动,启动两个协程,一个处理请求,一个处理应答。
  2. 在命令行输入 abci-cli console 进入交互模式,创建客户端并与服务端建立了持久连接。服务端和客户端各启动两个协程,一个处理请求,一个处理应答。
  3. 输入 deliver_tx "abc",客户端会识别子命令,调用 cmdDeliverTx 函数,此函数在解析到 tx 后进行编码,随后会调用 cli.DeliverTxSync(txBytes) (同步的) 函数。
  4. 请求消息写入连接后,服务端读取到此请求。
  5. 根据请求类型,调用 KVStore 应用的 DeliverTx 函数进行处理。
  6. 服务端处理完毕后的应答写入连接,客户端接收到应答,前端打印显示到命令行。

后端 Web 开发升级打怪手册

这篇文章 由浅入深系统的介绍了后端开发者进行 Web 开发的学习路线。原文对每条内容都有解释,建议看原文。这里是为了看着方便,便于对自己学习内容进行查漏补缺,对关键内容简单进行了翻译整理。

backend_roadmap

  1. 语言:学习一门编程语言,如 Python、Ruby、Golang、Rust 等等等等。
  2. 练习:用这门语言练习做一些小工具,比如实现一些命令、读取目录并以 JSON 展示啦等等。
  3. 包管理:学习对应语言的包管理工具,如 Node.js 的 NPM 或 Yarn。
  4. 最佳实践:遵循该语言的标准进行最佳实践。
  5. 安全:学习安全方面的知识,阅读 OWASP 教程。理解不同的安全问题,以及怎样避免它们。
  6. 练习:运用以上技能进行练习,可以在 GitHub 上找一些开源库,试着用你学到的最佳实践来重构、解决别人提出的问题,或者增加一些新功能,进行 PR。
  7. 测试:学习进行单元测试、集成测试、mock 测试、stubs 等。
  8. 练习:对以上做过的项目写单元测试进行练习。并学习计算测试的覆盖率。
  9. SQL 数据库:学习如何把数据持久化到关系型数据库,如 MySQL。
  10. 练习:可以写一个博客,用以上所学添加各种功能。
  11. 框架:学习一个 Web 框架。
  12. 练习:用框架把前面所学、所写来实现一遍。
  13. NoSQL 数据库:首先理解什么是非关系型数据库,它们与关系型数据库的区别,以及为什么需要它们。再去学习如 MongoDB 等数据库。
  14. 缓存:学习怎样实现应用层缓存。理解怎样使用 Redis 或 Memcached 并在之前做的应用中实现缓存。
  15. RESTful API:理解 REST 并学习如何创建 RESTful API,并阅读 Roy Fielding 的原始论文。当别人说 REST 就是写 HTTP API 时你可以反驳他。
  16. Auth:学习不同的身份验证和授权方式。理解它们是什么,它们有什么区别,以及何时使用哪个更好。大概有这几种:Oauth、Basic Authentication、Token、JWT 以及 OpenID。
  17. Message Brokers:学习消息代理中间件并理解何时以及为什么要用它们。可以学习 RabbitMQ 或 Kafka,首选 RabbitMQ。
  18. 搜索引擎:当你的应用数据量上来后,简单地查询数据库就不能满足性能需求,这时就需要用到搜索引擎了。
  19. Docker:学习 Docker 是什么、为什么要使用它以及如何使用它。
  20. Web 服务器:找出不同 Web 服务器之间的差异,了解限制和不同的可用配置选项以及如何最好地利用这些限制来编写应用程序。
  21. Web-Socket:虽然不是必需的,但在你的工具带中获得这些知识是有益的。了解如何使用 Web 套接字编写实时 Web 应用程序,并使用它进行一些示例应用程序。可以在上面的博客应用中使用它来实现博客文章列表的实时更新。
  22. GraphQL:学习它与 REST 有何不同,以及它为什么被称为 REST 2.0。
  23. 图形数据库:图形 models 是表示处理数据关系的一种非常灵活的方式,图形数据库提供快速有效的存储、检索和查询。可以学习如:Neo4j 或 OrientDB。
  24. 继续探索:保持开放的心态,持续学习新事物。最关键的是尽可能多地练习。刚开始时会显得很可怕,你可能觉得自己没有学到任何东西,但这是正常的,随着时间的推移,你会觉得自己变得更好。

Tendermint 节点启动源码分析

Tendermint 节点启动源码分析

本文以官方示例公链 Basecoin 的 basecoind start 命令为入口,结合日志与源码分析 Tendermint 节点创建、启动及生成区块等过程。

文中代码细节较多,但限于篇幅没有太深入某些细节,比如共识过程。如只想快速了解整体过程,可只阅读有 basecoind start 日志部分的内容。

start 命令入口

执行 basecoind init 命令初始化 genesis 配置、priv-validator 文件及 p2p-node 文件后,在命令行执行 basecoind start 启动节点。在不带有任何参数时,Tendermint 会与 ABCI 应用一起执行:

日志Starting ABCI with Tendermint module=main

func(cmd *cobra.Command, args []string) error {
	if !viper.GetBool(flagWithTendermint) {
		ctx.Logger.Info("Starting ABCI without Tendermint")
		return startStandAlone(ctx, appCreator)
	}
	ctx.Logger.Info("Starting ABCI with Tendermint")
	return startInProcess(ctx, appCreator)
}

创建节点

这一节把 NewNode()

创建三条连接的客户端

startInProcess 会创建 Tendermint 节点,然后启动节点。

创建节点时 Tendermint 与 ABCI 应用会创建建立三条连接所需的客户端:即 querymempool 以及 consensus 连接。

**注意:**所有要启动服务都是通过 BaseService.Start 来执行的。

日志Starting multiAppConn module=proxy impl=multiAppConn

NewNode() 中相关代码:

proxyApp := proxy.NewAppConns(clientCreator, handshaker)

// 实际执行的是 multiAppConn.multiAppConn()
if err := proxyApp.Start(); err != nil {
		return nil, fmt.Errorf("Error starting proxy app connections: %v", err)
	}

三条连接的客户端的建立在 multiAppConn.OnStart 方法中完成,在这里三条连接的客户端都是 localClient 结构,由于 localClient 没有实现 OnStart 方法,它们的 Start 方法调用的都是 cmn.BaseServiceOnStart 方法,也就是 querycli.Start() (另两个也一样)除了打印日志外,什么都不做。

接下来分别把 localClient 封装成了 appConnQueryappConnMempoolappConnConsensus 结构。

日志Starting localClient module=abci-client connection=query impl=localClient

multiAppConn.OnStart() 中相关代码:

// 在创建节点时传入的 clientCreator 的具体实现是 localClientCreator 结构,
// 这里最终返回的是 localClient 结构
querycli, err := app.clientCreator.NewABCIClient()

// 执行的是 cmn.BaseService 的 OnStart 方法,啥都没做
if err := querycli.Start(); err != nil {
		return errors.Wrap(err, "Error starting ABCI client (query connection)")
	}

其它两个连接的客户端与 query 连接客户端的创建相同。

握手同步

在建立完毕以上三条连接的客户端后,会执行 app.handshaker.Handshake(app) 来握手,确保 Tendermint 节点与应用程序的状态是同步的。

日志

ABCI Handshake                               module=consensus appHeight=169313 appHash=DC1ED303D0D1EE403CC010B911D7A991E3EAE7E3

ABCI Replay Blocks                           module=consensus appHeight=169313 storeHeight=169313 stateHeight=169313

Completed ABCI Handshake - Tendermint and App are synced module=consensus appHeight=169313 appHash=DC1ED303D0D1EE403CC010B911D7A991E3EAE7E3

query 连接上通过 ABCI Info 查询 ABCI 应用的 blockstore 中的最新状态,然后将 Tendermint 节点的区块重放到此状态:

multiAppConn.OnStart() 中相关代码:

if app.handshaker != nil {
   return app.handshaker.Handshake(app)
}

Handshaker.Handshake() 中相关代码:

// 从 ABCI 应用的 blockstore 中获取最新状态
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})

// 重放所有区块到最新状态
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)

重放完毕后,Tendermint 节点已经和 ABCI 应用同步到了相同的区块高度。

快速同步设置及验证人节点确认

在进入代码细节之前,先了解一下快速同步的概念。

快速同步(FastSync):在当前节点落后于区块链的最新状态时,需要进行节点间同步,快速同步只下载区块并检查验证人的默克尔树,比运行实时一致性八卦协议快得多。一旦追上其它节点的状态,守护进程将切换出快速同步并进入正常共识模式。在运行一段时间后,如果此节点至少有一个 peer,并且其区块高度至少与最大的报告的 peer 高度一样高,则认为该节点已追上区块链最新状态(即 caught up)。

现在回到 NewNode() 源码,当节点同步到 ABCI 应用的最新状态后,会检查当前状态的验证人集合中是否只有当前节点一个验证人,如果是,则无需快速同步。

// 重新从数据库加载状态,因为可能在握手时有更新
state = sm.LoadState(stateDB)

// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.

// 此字段用来指定当此节点在区块链末端有许多区块需要同步时,是否启用快速同步功能,默认为 true
fastSync := config.FastSync
if state.Validators.Size() == 1 {
   addr, _ := state.Validators.GetByIndex(0)
   if bytes.Equal(privValidator.GetAddress(), addr) {
      fastSync = false
   }
}

日志

This node is a validator                     module=consensus addr=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 pubKey=PubKeyEd25519{476CA31AE9AFB1FDC2BE1A00C32796FE5EEDBE3BD4C3C05CD1A76A4E975FB975}

NewNode() 中相关代码,显示当前节点是否是验证人:

// Log whether this node is a validator or an observer
if state.Validators.HasAddress(privValidator.GetAddress()) {
   consensusLogger.Info("This node is a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
} else {
   consensusLogger.Info("This node is not a validator", "addr", privValidator.GetAddress(), "pubKey", privValidator.GetPubKey())
}

创建各种 Reactor

Reactor 是处理各类传入消息的结构,一共有 5 种类型,通过将其添加到 Switch 中来实现。先熟悉一下 Switch 的结构:

Switch 处理 peer 连接,并暴露一个 API 以在各类 Reactor 上接收传入的消息。每个 Reactor 负责处理一个或多个 “Channels” 的传入消息。因此,发送传出消息通常在 peer 执行,传入的消息在 Reactor 上接收。

type Switch struct {
   cmn.BaseService

   config       *config.P2PConfig
   listeners    []Listener
   // 添加的 Reactor 都存在这里
   reactors     map[string]Reactor
   chDescs      []*conn.ChannelDescriptor
   reactorsByCh map[byte]Reactor
   peers        *PeerSet
   dialing      *cmn.CMap
   reconnecting *cmn.CMap
   nodeInfo     NodeInfo // our node info
   nodeKey      *NodeKey // our node privkey
   addrBook     AddrBook

   filterConnByAddr func(net.Addr) error
   filterConnByID   func(ID) error

   rng *cmn.Rand // seed for randomizing dial times and orders
}

MempoolReactor

Mempool 是一个有序的内存池,交易在被共识提议之前会存储在这里,而在存储到这里之前会通过 ABCI 应用的 CheckTx 方法检查其合法性。

先看 NewNode() 中相关代码:

mempoolLogger := logger.With("module", "mempool")
// 创建 Mempool
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)

// 初始化 Mempool 的  write-ahead log(确保可以从任何形式的崩溃中恢复过来)
mempool.InitWAL() // no need to have the mempool wal during tests
mempool.SetLogger(mempoolLogger)

// 创建 MempoolReactor
mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool)
mempoolReactor.SetLogger(mempoolLogger)

// 这里根据配置,判断是否要等待有交易时才生成新区块
if config.Consensus.WaitForTxs() {
   mempool.EnableTxsAvailable()
}

MempoolReactor 用来在 peer 之间对 mempool 交易进行广播。看一下它的数据结构:

type MempoolReactor struct {
   p2p.BaseReactor
   config  *cfg.MempoolConfig
   Mempool *Mempool
}

func NewMempoolReactor(config *cfg.MempoolConfig, mempool *Mempool) *MempoolReactor {
	memR := &MempoolReactor{
		config:  config,
		Mempool: mempool,
	}
	memR.BaseReactor = *p2p.NewBaseReactor("MempoolReactor", memR)
	return memR
}

EvidenceReactor

Evidence 是一个接口,表示验证人的任何可证明的恶意活动,主要有 DuplicateVoteEvidence (包含验证人签署两个相互矛盾的投票的证据。)这种实现。

NewNode() 中相关代码:

evidenceDB, err := dbProvider(&DBContext{"evidence", config})
if err != nil {
   return nil, err
}
evidenceLogger := logger.With("module", "evidence")

// EvidenceStore 用来存储见过的所有 Evidence,包括已提交的、已经过验证但没有广播的以及已经广播但未提交的
evidenceStore := evidence.NewEvidenceStore(evidenceDB)
// EvidencePool 在 EvidenceStore 中维护一组有效的 Evidence
evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
evidencePool.SetLogger(evidenceLogger)

// 创建 EvidenceReactor
evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger)

EvidenceReactor 用来在 peer 间对 EvidencePoolEvidence 进行广播。

type EvidenceReactor struct {
   p2p.BaseReactor
   evpool   *EvidencePool
    
   eventBus *types.EventBus
}

BlockchainReactor

NewNode() 中相关代码:

blockExecLogger := logger.With("module", "state")

// BlockExecutor 用来处理区块执行和状态更新。
// 它暴露一个 ApplyBlock() 方法,用来验证并执行区块、更新状态和 ABCI 应答,然后以原子方式提交
// 并更新 mempool,最后保存状态。
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool)

// 创建 BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))

BlockchainReactor 用来处理长期的 catchup 同步。

type BlockchainReactor struct {
	p2p.BaseReactor

	// immutable
	initialState sm.State

	blockExec *sm.BlockExecutor
    
    // 区块的底层存储。主要存储三种类型的信息:BlockMeta、Block part 和 Commit
	store     *BlockStore
    
    // 当加入到 BlockPool 时,peer 自己报告它们的高度。
    // 从当前节点最新的 pool.height 开始,从报告的高于我们高度的 peer 顺序请求区块。
    // 节点经常问 peer 他们当前的高度,这样我们就可以继续前进。
    // 不断请求更高的区块直到到达限制。如果大多数请求没有可用的 peer,并且没有处在 peer 限制,可以
    // 切换到 consensus reactor
	pool      *BlockPool
	fastSync  bool

	requestsCh <-chan BlockRequest
	errorsCh   <-chan peerError
}

ConsensusReactor

NewNode() 中相关代码:

ConsensusState 用来处理共识算法的执行。它处理投票和提案,一旦达成一致,将区块提交给区块链并针对 ABCI 应用执行它们。内部状态机从 peer、内部验证人和定时器接收输入。

// 创建 ConsensusState
consensusState := cs.NewConsensusState(config.Consensus, state.Copy(),
   blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger)

if privValidator != nil {
    // 设置 PrivValidator 用来投票
   consensusState.SetPrivValidator(privValidator)
}

// 创建 ConsensusReactor
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor.SetLogger(consensusLogger)

ConsensusReactor 用于共识服务。

type ConsensusReactor struct {
   p2p.BaseReactor // BaseService + p2p.Switch

   conS *ConsensusState

   mtx      sync.RWMutex
   fastSync bool
   eventBus *types.EventBus
}

把 reactor 添加到 Switch

NewNode() 中相关代码:

把上面创建的四个 Reactor 添加到 Switch 中。

p2pLogger := logger.With("module", "p2p")

sw := p2p.NewSwitch(config.P2P)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)

PEXReactor(可选的)

看代码之前先了解几个概念:

  • seeds:启动节点时可通过 --p2p.seeds 标签来指定种子节点,可以从中获得许多其它 peer 的地址。

    tendermint node --p2p.seeds "[email protected]:46656,[email protected]:46656"
  • persistent_peers:可指定与当前节点保持持久连接的一组节点。或使用 RPC 端点 /dial_peers 来指定而无需停止 Tendermint 实例。

    tendermint node --p2p.persistent_peers "[email protected]:46656,[email protected]:46656"
    curl 'localhost:46657/dial_peers?persistent=true&peers=\["[email protected]:46656","[email protected]:46656"\]'
  • PEX:peer-exchange 协议的缩写,默认是开启的,在第一次启动后通常不需要种子。peer 之间将传播已知的 peer 并形成一个网络。peer 地址存储在 addrbook 中。

如果 PEX 模式是打开的,它应该处理种子的拨号,否则将由 Switch 来处理。

NewNode() 中相关代码:

// 创建 addrBook
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))

// 如果开启了 PEX 模式
if config.P2P.PexReactor {
   // 创建 PEX reactor
   pexReactor := pex.NewPEXReactor(addrBook,
      &pex.PEXReactorConfig{
         Seeds:          cmn.SplitAndTrim(config.P2P.Seeds, ",", " "),
         SeedMode:       config.P2P.SeedMode,
         PrivatePeerIDs: cmn.SplitAndTrim(config.P2P.PrivatePeerIDs, ",", " ")})
   pexReactor.SetLogger(p2pLogger)
   // 添加到 Switch
   sw.AddReactor("PEX", pexReactor)
}

sw.SetAddrBook(addrBook)

PEXReactor 处理 PEX 并保证足够数量的 peer 连接到 Switch。用 AddrBook 存储 peer 的 NetAddress。为防止滥用,只接受来自 peer 的 pexAddrsMsg,我们也发送了相应的 pexRequestMsg。每个 defaultEnsurePeersPeriod 时间段内只接收一个 pexRequestMsg

FilterPeers

配置中的 config.FilterPeers 字段用来指定当连接到一个新 peer 时是否要查询 ABCI 应用,由应用用来决定是否要保持连接。

使用 ABCI 查询通过 addr 或 pubkey 来过滤 peer,如果返回 OK 则添加 peer。

NewNode() 中相关代码:

if config.FilterPeers {
   // 设置两种类型的 Filter
   sw.SetAddrFilter(func(addr net.Addr) error {
      resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/addr/%s", addr.String())})
      if err != nil {
         return err
      }
      if resQuery.IsErr() {
         return fmt.Errorf("Error querying abci app: %v", resQuery)
      }
      return nil
   })
   sw.SetIDFilter(func(id p2p.ID) error {
      resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: cmn.Fmt("/p2p/filter/pubkey/%s", id)})
      if err != nil {
         return err
      }
      if resQuery.IsErr() {
         return fmt.Errorf("Error querying abci app: %v", resQuery)
      }
      return nil
   })
}

设置 EventBus

EventBus 是一个通过此系统的所有事件的事件总线,所有的调用都代理到底层的 pubsub 服务器。所有事件都必须用 EventBus 来发布,以保证正确的数据类型。

type EventBus struct {
	cmn.BaseService
    
    // Server 允许 client 订阅/取消订阅消息,带或不带 tag 发布消息,并管理内部状态
	pubsub *tmpubsub.Server
}

NewNode() 中相关代码:

eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))

// 将要发布和/或订阅消息(事件)的服务
// consensusReactor 将在 consensusState 和 blockExecutor 上设置 eventBus
consensusReactor.SetEventBus(eventBus)

设置 TxIndexer

TxIndexer 是一个接口,定义了索引和搜索交易的方法。它有两种实现,一个是 kv.TxIndex,由键值存储支持(levelDB),另一个是 null.TxIndex,即不设置索引(默认的)。

IndexerService 会把 TxIndexerEventBus 连接在一起,以对来自 EventBus 的交易进行索引。

type IndexerService struct {
	cmn.BaseService

	idr      TxIndexer
	eventBus *types.EventBus
}

NewNode() 中相关代码:

var txIndexer txindex.TxIndexer
switch config.TxIndex.Indexer {
// 键值存储索引
case "kv":
   // 创建 DB
   store, err := dbProvider(&DBContext{"tx_index", config})
   if err != nil {
      return nil, err
   }
   // 有指定要索引的标签列表(以逗号分隔)
   if config.TxIndex.IndexTags != "" {
      txIndexer = kv.NewTxIndex(store, kv.IndexTags(cmn.SplitAndTrim(config.TxIndex.IndexTags, ",", " ")))
   // 要索引所有标签
   } else if config.TxIndex.IndexAllTags {
      txIndexer = kv.NewTxIndex(store, kv.IndexAllTags())
   // 不索引标签
   } else {
      txIndexer = kv.NewTxIndex(store)
   }
default:
   txIndexer = &null.TxIndex{}
}

// 创建 IndexerService
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))

创建节点的 BaseService

节点所需的所有字段内容已在以上部分创建完毕,在创建 BaseService 后,就可以启动节点了。

node.BaseService = *cmn.NewBaseService(logger, "Node", node)

创建节点总结

总结一下 NewNode 函数都做了哪些事情:

  • 创建 Tendermint 与 ABCI 应用建立 mempoolconsensusquery 连接所需的客户端。
  • Tendermint 节点与应用程序执行握手,确保其状态是同步的。
  • 根据配置 config.FastSyncprivValidator.GetAddress() 方法,判断是否需要快速同步,是否是验证人节点。
  • 创建并在 Switch 中设置 Reactor,即 MempoolReactorEvidenceReactorBlockchainReactorConsensusReactor 以及 PEXReactor 这五种。用来在 peer 上接收不同类型的消息。
  • 根据配置 config.FilterPeers 判断是否要用 ABCI 查询通过 addr 或 pubkey 来过滤要新连接的 peer,如果返回 OK 则添加 peer。
  • 创建并设置 EventBus,订阅/发布事件。
  • 设置 TxIndexer,对交易进行索引。

启动节点

回到 startInProcess 函数,在这里创建完毕节点后,执行 Start 方法,实际执行的是节点的 OnStart 方法。

接下来就看这个方法:

// 日志:Starting Node    module=node impl=Node
func (n *Node) OnStart() error {
    // 日志:Starting EventBus    module=events impl=EventBus
	err := n.eventBus.Start()
	if err != nil {
		return err
	}

     // 监听 P2P 连接的端口
	// 日志:Local listener    module=p2p ip=:: port=46656
    // 日志:Could not perform UPNP discover    module=p2p err="write udp4 0.0.0.0:63731->239.255.255.250:1900: i/o timeout"
    // 日志:Starting DefaultListener    module=p2p impl=Listener(@169.254.237.47:46656)
	protocol, address := cmn.ProtocolAndAddress(n.config.P2P.ListenAddress)
	l := p2p.NewDefaultListener(protocol, address, n.config.P2P.SkipUPNP, n.Logger.With("module", "p2p"))
	n.sw.AddListener(l)

	// 生成节点的 PrivKey
    // 日志:P2P Node ID    module=node ID=3158bddb4e03bef94bf4a99543af5beab4733368 file=/Users/LLLeon/.basecoind/config/node_key.json
	nodeKey, err := p2p.LoadOrGenNodeKey(n.config.NodeKeyFile())
	if err != nil {
		return err
	}
	n.Logger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", n.config.NodeKeyFile())

	nodeInfo := n.makeNodeInfo(nodeKey.ID())
	n.sw.SetNodeInfo(nodeInfo)
	n.sw.SetNodeKey(nodeKey)

	// 将自己添加到 addrbook 以防止连接自己
    // 日志: Add our address to book    module=p2p book=/Users/LLLeon/.basecoind/config/addrbook.json [email protected]:46656
	n.addrBook.AddOurAddress(nodeInfo.NetAddress())
    
    // 在 P2P 服务器之前启动 RPC 服务器,所以可以例如:接收第一个区块的交易
    // 日志:Starting RPC HTTP server on tcp://0.0.0.0:46657 module=rpc-server
	if n.config.RPC.ListenAddress != "" {
		listeners, err := n.startRPC()
		if err != nil {
			return err
		}
		n.rpcListeners = listeners
	}

	// 启动 switch (P2P 服务器),函数内部依次启动了各 Reactor,
    // Switch 会在 46656 端口(默认的)监听 peer 的连接,连接后会通过 Reactor 的 Receive 方法
    // 接收来自 peer 的消息
    
    // 日志:Starting P2P Switch    module=p2p impl="P2P Switch"
    // 日志:Starting BlockchainReactor    module=blockchain impl=BlockchainReactor
    // 日志:Starting ConsensusReactor    module=consensus impl=ConsensusReactor
    // 日志:ConsensusReactor    module=consensus fastSync=false
    // 日志:Starting ConsensusState    module=consensus impl=ConsensusState
    // 日志:Starting baseWAL    module=consensus wal=/Users/LLLeon/.basecoind/data/cs.wal/wal impl=baseWAL
    // 日志:Catchup by replaying consensus messages    module=consensus height=169315
    // 日志:Replay: Done    module=consensus
    // 日志:Starting EvidenceReactor    module=evidence impl=EvidenceReactor
    // 日志:Starting PEXReactor    module=p2p impl=PEXReactor
    // 日志:Starting AddrBook    module=p2p book=/Users/LLLeon/.basecoind/config/addrbook.json impl=AddrBook
    // 日志:enterNewRound(169315/0). Current: 169315/0/RoundStepNewHeight module=consensus height=169315 round=0
    // 日志:enterPropose(169315/0). Current: 169315/0/RoundStepNewRound module=consensus height=169315 round=0
    // 日志:enterPropose: Our turn to propose            module=consensus height=169315 round=0 proposer=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 privValidator="PrivValidator{FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 LH:169314, LR:0, LS:3}"
    // 日志:Starting MempoolReactor    module=mempool impl=MempoolReactor
	err = n.sw.Start()
	if err != nil {
		return err
	}

	// 始终连接到持久 peers
	if n.config.P2P.PersistentPeers != "" {
		err = n.sw.DialPeersAsync(n.addrBook, cmn.SplitAndTrim(n.config.P2P.PersistentPeers, ",", " "), true)
		if err != nil {
			return err
		}
	}

	// 启动交易的 indexer
    // 日志:Starting IndexerService    module=txindex impl=IndexerService
	return n.indexerService.Start()
}

至此,节点中各模块已经启动完毕,接下来进入 catchup、共识协议等核心处理逻辑。

Tendermint 核心处理逻辑

这部分包括区块的 catchup、共识协议处理等内容。

这部分逻辑是在 ConsensusReactor 启动时处理的。

日志

Starting ConsensusReactor                    module=consensus impl=ConsensusReactor
ConsensusReactor                             module=consensus fastSync=false
Starting ConsensusState                      module=consensus impl=ConsensusState
Starting baseWAL                             module=consensus wal=/Users/LLLeon/.basecoind/data/cs.wal/wal impl=baseWAL
Starting TimeoutTicker                       module=consensus impl=TimeoutTicker
Catchup by replaying consensus messages      module=consensus height=169315
Replay: Done                                 module=consensus

先看 ConsensusReactor 启动的代码:

func (conR *ConsensusReactor) OnStart() error {
   conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
   if err := conR.BaseReactor.OnStart(); err != nil {
      return err
   }

   // 订阅这几种类型的事件:EventNewRoundStep、EventVote 和 EventProposalHeartbeat,一旦接收到这些
   // 类型的消息就会用在 state 中定义的 pubsub 广播给 peer
   conR.subscribeToBroadcastEvents()

    // 由于只有我们一个节点,所以会设置 FastSync = false,会启动 ConsensusState
   if !conR.FastSync() {
      err := conR.conS.Start()
      if err != nil {
         return err
      }
   }

   return nil
}

现在看 ConsensusState 的启动:

func (cs *ConsensusState) OnStart() error {
	if err := cs.evsw.Start(); err != nil {
		return err
	}

	// we may set the WAL in testing before calling Start,
	// so only OpenWAL if its still the nilWAL
	if _, ok := cs.wal.(nilWAL); ok {
		walFile := cs.config.WalFile()
        
         // 这里会启动 baseWAL,它在处理消息之前会将其写入磁盘。可用于崩溃恢复和确定性重放
		wal, err := cs.OpenWAL(walFile)
		if err != nil {
			cs.Logger.Error("Error loading ConsensusState wal", "err", err.Error())
			return err
		}
		cs.wal = wal
	}

	// we need the timeoutRoutine for replay so
	// we don't block on the tick chan.
	// NOTE: we will get a build up of garbage go routines
	// firing on the tockChan until the receiveRoutine is started
	// to deal with them (by that point, at most one will be valid)
    
    // 用来对每一步的超时进行控制
    // 里面是在协程里面执行 timeoutRoutine 方法,内部会监听 timeoutTicker.tickChan,进行到下一步时
    // 会中止并重置旧 timer,并更新 timeoutInfo。tickChan 上超时时间为 0 时,会立即转发到 tockChan
    // 日志:Starting TimeoutTicker    module=consensus impl=TimeoutTicker
	if err := cs.timeoutTicker.Start(); err != nil {
		return err
	}

	// we may have lost some votes if the process crashed
	// reload from consensus log to catchup
    
    // 新建 ConsensusState 时已设置为 true
	if cs.doWALCatchup {
         // catchup:仅重放自上一个区块以来的那些消息
         // 日志:Catchup by replaying consensus messages      module=consensus height=169315
         // 日志:Replay: Done                                 module=consensus
		if err := cs.catchupReplay(cs.Height); err != nil {
			cs.Logger.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "err", err.Error())
			// NOTE: if we ever do return an error here,
			// make sure to stop the timeoutTicker
		}
	}

    // 核心处理逻辑,主要的协程,详细解释见下面
	go cs.receiveRoutine(0)

	// schedule the first round!
	// use GetRoundState so we don't race the receiveRoutine for access
	cs.scheduleRound0(cs.GetRoundState())

	return nil
}

这个方法里面主要做了区块的 catchup、在协程中启动 receiveRoutine() 方法来接收并处理消息,以及执行 scheduleRound0() 方法来进入新回合,下面逐一说明。

catchup

在这次启动节点之前,已经运行过一段时间,区块最新高度为 169315。首先看节点在重启后是如何 catchup 到此高度的。

catchupReplay 源码:

func (cs *ConsensusState) catchupReplay(csHeight int64) error {

   // Set replayMode to true so we don't log signing errors.
   cs.replayMode = true
   defer func() { cs.replayMode = false }()

   // Ensure that #ENDHEIGHT for this height doesn't exist.
   // NOTE: This is just a sanity check. As far as we know things work fine
   // without it, and Handshake could reuse ConsensusState if it weren't for
   // this check (since we can crash after writing #ENDHEIGHT).
   //
   // Ignore data corruption errors since this is a sanity check.
   gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
   if err != nil {
      return err
   }
   if gr != nil {
      if err := gr.Close(); err != nil {
         return err
      }
   }
   if found {
      return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight)
   }

   // Search for last height marker.
   //
   // Ignore data corruption errors in previous heights because we only care about last height
   gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
   if err == io.EOF {
      cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
   } else if err != nil {
      return err
   }
   if !found {
      return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, csHeight-1)
   }
   defer gr.Close() // nolint: errcheck

   cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)

   var msg *TimedWALMessage
   dec := WALDecoder{gr}

   for {
      msg, err = dec.Decode()
      if err == io.EOF {
         break
      } else if IsDataCorruptionError(err) {
         cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
         panic(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
      } else if err != nil {
         return err
      }

      // NOTE: since the priv key is set when the msgs are received
      // it will attempt to eg double sign but we can just ignore it
      // since the votes will be replayed and we'll get to the next step
      if err := cs.readReplayMessage(msg, nil); err != nil {
         return err
      }
   }
   cs.Logger.Info("Replay: Done")
   return nil
}

receiveRoutine

核心处理逻辑:需要重点看一下 cs.receiveRoutine() 方法。

这个方法处理可能引起状态转换的消息,参数 maxSteps 表示退出前要处理的消息数量,0 表示永不退出。它持有 RoundState 并且是唯一更新它的地方。更新发生在超时、完成提案和 2/3 多数时。 ConsensusState 在任意内部状态更新之前必须是锁定的。

这个方法在单独的协程里面,接收并处理来自 peer、节点内部或超时消息。

func (cs *ConsensusState) receiveRoutine(maxSteps int) {
   defer func() {
      if r := recover(); r != nil {
         cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
      }
   }()

   for {
      // 到达设定的 maxSteps 时退出
      if maxSteps > 0 {
         if cs.nSteps >= maxSteps {
            cs.Logger.Info("reached max steps. exiting receive routine")
            cs.nSteps = 0
            return
         }
      }
      rs := cs.RoundState
      var mi msgInfo

      select {
      // TxsAvailable 返回一个通道,每添加一个交易到 mempool 后会触发一次,
      // 并且只有在 mempool 中的交易可用时才会触发。
      // 如果没有调用 EnableTxsAvailable,返回的 channel 可能为 nil。
      case height := <-cs.mempool.TxsAvailable():
          
          // 这里会执行 enterPropose(),进入提议环节,完成后会执行 enterPrevote 进入 Prevote 环节
          // 日志:enterPropose(169315/0). Current: 169315/0/RoundStepNewRound module=consensus height=169315 round=0
          // 日志:enterPropose: Our turn to propose            module=consensus height=169315 round=0 proposer=FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 privValidator="PrivValidator{FF2AF6957DCD4B2FA8373D2157DE67278C5F0E41 LH:169314, LR:0, LS:3}"
         cs.handleTxsAvailable(height)
          
      // 接收到来自 peer 的消息
      case mi = <-cs.peerMsgQueue:
         cs.wal.Write(mi)
         // handles proposals, block parts, votes
         // may generate internal events (votes, complete proposals, 2/3 majorities)
         // 处理消息
         cs.handleMsg(mi)
          
      // 接收到来自内部的消息
      case mi = <-cs.internalMsgQueue:
         // 发送签名的消息前写入磁盘
         cs.wal.WriteSync(mi) // NOTE: fsync
         // handles proposals, block parts, votes
         cs.handleMsg(mi)
          
      // 接收到超时消息
      case ti := <-cs.timeoutTicker.Chan(): // tockChan:
         cs.wal.Write(ti)
         // if the timeout is relevant to the rs
         // go to the next step
         cs.handleTimeout(ti, rs)
      case <-cs.Quit():

         // NOTE: the internalMsgQueue may have signed messages from our
         // priv_val that haven't hit the WAL, but its ok because
         // priv_val tracks LastSig

         // close wal now that we're done writing to it
         cs.wal.Stop()

         close(cs.done)
         return
      }
   }
}

scheduleRound0

scheduleRound0() 方法会将当前 consensus H/R/S 封装成 timeoutInfo 发送给 tickChan,这就进入了 timeoutTicker.timeoutRoutine 的逻辑。

这里发送的信息第 0 回合,步骤为 RoundStepNewHeight

scheduleRound0 源码:

func (cs *ConsensusState) scheduleRound0(rs *cstypes.RoundState) {
	
	sleepDuration := rs.StartTime.Sub(time.Now())
	cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}

timeoutTicker.timeoutRoutine 源码:

func (t *timeoutTicker) timeoutRoutine() {
   t.Logger.Debug("Starting timeout routine")
   var ti timeoutInfo
   for {
      select {
      case newti := <-t.tickChan:
         t.Logger.Debug("Received tick", "old_ti", ti, "new_ti", newti)

         // ignore tickers for old height/round/step
         if newti.Height < ti.Height {
            continue
         } else if newti.Height == ti.Height {
            if newti.Round < ti.Round {
               continue
            } else if newti.Round == ti.Round {
               if ti.Step > 0 && newti.Step <= ti.Step {
                  continue
               }
            }
         }

         // stop the last timer
         t.stopTimer()

         // update timeoutInfo and reset timer
         // NOTE time.Timer allows duration to be non-positive
         ti = newti
         t.timer.Reset(ti.Duration)
         t.Logger.Debug("Scheduled timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
      case <-t.timer.C:
         t.Logger.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
         // go routine here guarantees timeoutRoutine doesn't block.
         // Determinism comes from playback in the receiveRoutine.
         // We can eliminate it by merging the timeoutRoutine into receiveRoutine
         // and managing the timeouts ourselves with a millisecond ticker
         go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
      case <-t.Quit():
         return
      }
   }
}

receiveRoutine 中,如果触发了超时,会执行 handleTimeout,进行状态转移相关操作:

func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
	cs.Logger.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)

	// timeouts must be for current height, round, step
	if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) {
		cs.Logger.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
		return
	}

	// the timeout will now cause a state transition
	cs.mtx.Lock()
	defer cs.mtx.Unlock()

	switch ti.Step {
	case cstypes.RoundStepNewHeight:
		// NewRound event fired from enterNewRound.
		// XXX: should we fire timeout here (for timeout commit)?
		cs.enterNewRound(ti.Height, 0)
	case cstypes.RoundStepNewRound:
		cs.enterPropose(ti.Height, 0)
	case cstypes.RoundStepPropose:
		cs.eventBus.PublishEventTimeoutPropose(cs.RoundStateEvent())
		cs.enterPrevote(ti.Height, ti.Round)
	case cstypes.RoundStepPrevoteWait:
		cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
		cs.enterPrecommit(ti.Height, ti.Round)
	case cstypes.RoundStepPrecommitWait:
		cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
		cs.enterNewRound(ti.Height, ti.Round+1)
	default:
		panic(cmn.Fmt("Invalid timeout step: %v", ti.Step))
	}

}

共识协议处理

共识投票和生成区块等环节是在 receiveRoutine 中收到消息或超时后由 handleMsghandleTimeout 执行相应方法进行处理。

本文只提供一个阅读 Tendermint 源码的入口,其它核心逻辑,比如创建区块等逻辑,需要自己深入 consensus 包。

Golang 中互斥锁 Mutex 的源码实现

本文基于 go1.11 版本。

Mutex 使用

在深入源码之前,要先搞清楚一点,对 Golang 中互斥锁 sync.Mutex 的操作是程序员的主动行为,可以看作是是一种协议,而不是强制在操作前必须先获取锁。

这样说可能有点抽象,看下面这段代码:

package main

import (
	"fmt"
	"sync"
	"time"
)

type People struct {
	mux  sync.Mutex
	Name string
	Age  uint8
}

func (p *People) IncAge() {
	p.mux.Lock()

	time.Sleep(3 * time.Second)
	p.Age++

	p.mux.Unlock()
}

func main() {
	leo := &People{Name: "leo", Age: 18}

	innerIncTime := time.Now().Second()
	fmt.Println("with mutex  inc time:", innerIncTime)
	go leo.IncAge()

	time.Sleep(time.Second)
	outerIncTime := time.Now().Second()
	fmt.Println("without mutex inc time:", outerIncTime)
	leo.Age++
	fmt.Println("without mutex inc result:", leo.Age)

	fmt.Println("mutex status:", leo.mux)
	time.Sleep(2 * time.Second)
	fmt.Println("with mutex inc result:", leo.Age)
	fmt.Println("Two seconds later mutex status:", leo.mux)
}

在执行 leo.Age++ 之前已经加锁了,如果是需要强制获取锁的话,这里会等待 3 秒直到锁释放后才能执行,而这里没有获取锁就可以直接对 Age 字段进行操作,输出结果:

with mutex  inc time: 19
without mutex inc time: 20
without mutex inc result: 19
mutex status: {1 0}
with mutex inc result: 20
Two seconds later mutex status: {0 0}

所以,如果在一个 goroutine 中对锁执行了 Lock(),在另一个 goroutine 可以不用理会这个锁,直接进行操作(当然不建议这么做)。

还有一点需要注意的是,锁只和具体变量关联,与特定 goroutine 无关。虽然可以在一个 goroutine 中加锁,在另一个 goroutine 中解锁(如通过指针传递变量,或全局变量都可以),但还是建议在同一个代码块中进行成对的加锁解锁操作。

源码分析

这是 Mutex 的 源码链接

Mutex 结构

Mutex 表示一个互斥锁,其零值就是未加锁状态的 mutex,无需初始化。在首次使用后不要做值拷贝,这样可能会使锁失效。

type Mutex struct {
	state int32  // 表示锁当前的状态
	sema  uint32 // 信号量,用于向处于 Gwaitting 的 G 发送信号
}

几个常量

这里用位操作来表示锁的不同状态。

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

	starvationThresholdNs = 1e6
)

####mutexLocked

值为 1,第一位为 1,表示 mutex 已经被加锁。根据 mutex.state & mutexLocked 的结果来判断 mutex 的状态:该位为 1 表示已加锁,0 表示未加锁。

mutexWoken

值为 2,第二位为 1,表示 mutex 是否被唤醒。根据 mutex.state & mutexWoken 的结果判断 mutex 是否被唤醒:该位为 1 表示已被唤醒,0 表示未被唤醒。

mutexStarving

值为 4,第三位为 1,表示 mutex 是否处于饥饿模式。根据 mutex.state & mutexWoken 的结果判断 mutex 是否处于饥饿模式:该位为 1 表示处于饥饿模式,0 表示正常模式。

mutexWaiterShift

值为 3,表示 mutex.state 右移 3 位后即为等待的 goroutine 的数量。

starvationThresholdNs

值为 1000000 纳秒,即 1ms,表示将 mutex 切换到饥饿模式的等待时间阈值。这个常量在源码中有大篇幅的注释,理解这段注释对理解程序逻辑至关重要,翻译整理如下:

引入这个常量是为了保证出现 mutex 竞争情况时的公平性。mutex 有两种操作模式:正常模式和饥饿模式

正常模式下,等待者以 FIFO 的顺序排队来获取锁,但被唤醒的等待者发现并没有获取到 mutex,并且还要与新到达的 goroutine 们竞争 mutex 的所有权。新到达的 goroutine 们有一个优势 —— 它们已经运行在 CPU 上且可能数量很多,所以一个醒来的等待者有很大可能会获取不到锁。在这种情况下它处在等待队列的前面。如果一个 goroutine 等待 mutex 释放的时间超过 1ms,它就会将 mutex 切换到饥饿模式。

在饥饿模式下,mutex 的所有权直接从对 mutex 执行解锁的 goroutine 传递给等待队列前面的等待者。新到达的 goroutine 们不要尝试去获取 mutex,即使它看起来是在解锁状态,也不要试图自旋(等也白等,在饥饿模式下是不会给你的),而是自己乖乖到等待队列的尾部排队去。

如果一个等待者获得 mutex 的所有权,并且看到以下两种情况中的任一种:1) 它是等待队列中的最后一个,或者 2) 它等待的时间少于 1ms,它便将 mutex 切换回正常操作模式。

正常模式有更好地性能,因为一个 goroutine 可以连续获得好几次 mutex,即使有阻塞的等待者。而饥饿模式可以有效防止出现位于等待队列尾部的等待者一直无法获取到 mutex 的情况。

自旋锁操作

在开始看 Mutex 源码前要先介绍几个与自旋锁相关的函数,源码中通过这几个函数实现了对自旋锁的操作。这几个函数实际执行的代码都是在 runtime 包中实现的。

runtime_canSpin

代码具体位置

由于 Mutex 的特性,自旋需要比较保守的进行,原因参考上面 starvationThresholdNs 常量的注释。

限制条件是:只能自旋少于 4 次,而且仅当运行在多核机器上并且 GOMAXPROCS>1;最少有一个其它正在运行的 P,并且本地的运行队列 runq 里没有 G 在等待。与 runtime mutex 相反,不做被动自旋,因为可以在全局 runq 上或其它 P 上工作。

// Active spinning for sync.Mutex.
//go:linkname sync_runtime_canSpin sync.runtime_canSpin
//go:nosplit
func sync_runtime_canSpin(i int) bool {
	// sync.Mutex is cooperative, so we are conservative with spinning.
	// Spin only few times and only if running on a multicore machine and
	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
	// As opposed to runtime mutex we don't do passive spinning here,
	// because there can be work on global runq or on other Ps.
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

runtime_doSpin

代码具体位置同上。

执行自旋操作,这个函数是用汇编实现的,函数内部循环调用 PAUSE 指令。PAUSE 指令什么都不做,但是会消耗 CPU 时间。

/go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}

runtime_SemacquireMutex

代码具体位置。发送获取到 Mutex 的信号。

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}

runtime_Semrelease

代码具体位置同上。发送释放 Mutex 的信号。

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool) {
	semrelease1(addr, handoff)
}

Lock

这是 Lock 方法的全部源码,先看一下整体逻辑,下面会分段解释:

首先是直接调用 CAS 尝试获取锁,如果获取到则将锁的状态从 0 切换为 1 并返回。获取不到就进入 for 循环,通过自旋来等待锁被其它 goroutine 释放,只有两个地方 break 退出 for 循环而获取到锁。

源码实现分析

刚进入函数,会尝试获取锁:

if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}

直接通过调用 CompareAndSwapInt32 这个方法来检查锁的状态是否是 0,如果是则表示可以加锁,将其状态转换为 1,当前 goroutine 加锁成功,函数返回。

CompareAndSwapInt32 这个方法是汇编实现的,在单核CPU上运行是可以保证原子性的,但在多核 CPU 上运行时,需要加上 LOCK 前缀来对总线加锁,从而保证了该指令的原子性。

至于 race.Enabled ,这里是判断是否启用了竞争检测,即程序编译或运行时是否加上了 -race 子命令。关于竞争检测如想深入了解可以看官方博客,见参考目录 1。在这里可以不用理会。

如果 mutex 已经被其它 goroutine 持有,则进入下面的逻辑。先定义了几个变量:

var waitStartTime int64 // 当前 goroutine 开始等待的时间
starving := false       // mutex 当前的所处的模式
awoke := false          // 当前 goroutine 是否被唤醒
iter := 0               // 自旋迭代的次数
old := m.state          // 保存 mutex 当前状态

进入 for 循环后,先检查是否可以进行自旋:

如上所述,不要在饥饿模式下进行自旋,因为在饥饿模式下只有等待者们可以获得 mutex 的所有权,这时自旋是不可能获取到锁的。

能进入执行自旋逻辑部分的条件:当前不是饥饿模式,而且当前还可以进行自旋(见上面的 runtime_canSpin 函数)。

然后是判断能否唤醒当前 goroutine 的四个条件:根据 1)!awoke 和 2)old&mutexWoken == 0 来判断当前 goroutine 还没有被唤醒;3)old>>mutexWaiterShift != 0 表示还有其它在等待的 goroutine;4)如果当前 goroutine 状态还没有变,就将其状态切换为 old|mutexWoken, 即唤醒状态 。

// old&(mutexLocked|mutexStarving) == mutexLocked 表示 mutex 当前不处于饥饿模式。
// 即 old & 0101 == 0001,old 的第一位必定为 1,第三位必定为 0,即未处于饥饿模式。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    // 这时自旋是有意义的,通过把 mutexWoken 标识为 true,以通知 Unlock 方法就先不要叫醒其它
    // 阻塞着的 goroutine 了。
	if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
		atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
		awoke = true
	}
    // 将当前 goroutine 标识为唤醒状态后,执行自旋操作,计数器加一,将当前状态记录到 old,继续循环等待
	runtime_doSpin()
	iter++
	old = m.state
	continue
}

如果不能进行自旋操作,进入下面的逻辑:

如果 mutex 当前处于正常模式,将 new 的第一位即锁位设置为 1;如果 mutex 当前已经被加锁或处于饥饿模式,则当前 goroutine 进入等待队列;如果 mutex 当前处于饥饿模式,而且 mutex 已被加锁,则将 new 的第三位即饥饿模式位设置为 1。

new := old
// 不要尝试获取处于饥饿模式的锁,新到达的 goroutine 们必须排队。
if old&mutexStarving == 0 {
	new |= mutexLocked
}
// 没有获取到锁,当前 goroutine 进入等待队列。
// old & 0101 != 0,那么 old 的第一位和第三位至少有一个为 1,即 mutex 已加锁或处于饥饿模式。
if old&(mutexLocked|mutexStarving) != 0 {
	new += 1 << mutexWaiterShift
}
// 当前 goroutine 将 mutex 切换到饥饿模式。但如果当前 mutex 是解锁状态,不要切换。
// Unlock 期望处于饥饿模式的 mutex 有等待者,在这种情况下不会这样。
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}

设置好 new 后,继续下面的逻辑:

当 goroutine 被唤醒时,如果 new 还没有被唤醒,则发生了不一致的 mutex 状态,抛出错误;否则就重置 new 的第二位即唤醒位为 0。

if awoke {
	if new&mutexWoken == 0 {
		throw("sync: inconsistent mutex state")
	}
	new &^= mutexWoken
}

接下来会调用 CAS 来将 mutex 当前状态由 old 更新为 new:

如更新成功,old&(mutexLocked|mutexStarving) == 0 表示 mutex 未锁定且未处于饥饿模式,则 break 跳出循环,当前 goroutine 获取到锁。

如果当前的 goroutine 之前已经在排队了,就排到队列的前面。runtime_SemacquireMutex(&m.sema, queueLifo) 这个函数就是做插队操作的,如果 queueLifo == true,就把当前 goroutine 插入到等待队列的前面。

继续往下,如果 mutex 当前是处于饥饿模式,则修改等待的 goroutine 数量和第三位即饥饿模式位,break 跳出循环,当前 goroutine 获取到锁;如果是正常模式,继续循环。

if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // old & 0101 == 0,old 的第一位和第三位必定不是 1,即 mutex 未锁定且未处于饥饿模式。
	if old&(mutexLocked|mutexStarving) == 0 {
		break // locked the mutex with CAS
	}
  
	// If we were already waiting before, queue at the front of the queue.
	queueLifo := waitStartTime != 0
	if waitStartTime == 0 {
        // 之前没排过队,开始计时。
		waitStartTime = runtime_nanotime()
	}
	runtime_SemacquireMutex(&m.sema, queueLifo)
  
    // 确定 mutex 当前所处模式
	starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
	old = m.state
	if old&mutexStarving != 0 {
		// If this goroutine was woken and mutex is in starvation mode,
		// ownership was handed off to us but mutex is in somewhat
		// inconsistent state: mutexLocked is not set and we are still
		// accounted as waiter. Fix that.
		if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
			throw("sync: inconsistent mutex state")
		}
		delta := int32(mutexLocked - 1<<mutexWaiterShift)
        // 如果不是饥饿模式或只剩一个等待者了,退出饥饿模式
		if !starving || old>>mutexWaiterShift == 1 {
			// Exit starvation mode.
			// Critical to do it here and consider wait time.
			// Starvation mode is so inefficient, that two goroutines
			// can go lock-step infinitely once they switch mutex
			// to starvation mode.
			delta -= mutexStarving
		}
		atomic.AddInt32(&m.state, delta)
		break
	}
    // 未处于饥饿模式,让新到来的 goroutine 先获取锁,继续循环
	awoke = true
	iter = 0
} else {
    // 上面的 CAS 没有成功更新为 new,记录当前状态,继续循环
	old = m.state
}

Unlock

Unlock 的代码比较少,直接在代码中注释:

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
    // 将 mutex 当前状态第一位即锁位置为 0 保存到 new
	new := atomic.AddInt32(&m.state, -mutexLocked)
    // 当 new 状态锁位为 1 时会满足此条件,即对未加锁状态的 mutex 进行解锁,抛出错误
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
    // 如果未处于饥饿模式
	if new&mutexStarving == 0 {
		old := new
		for {
            // 如果没有等待者,或者一个 goroutine 已经被唤醒或获取到锁,或处于饥饿模式,
            // 无需唤醒任何其它被挂起的 goroutine。
            // 在饥饿模式中,所有权直接从执行解锁的 goroutine 传递给下一个等待者。
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}

            // 等待者数量减 1,并将唤醒位改成 1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 唤醒一个阻塞的 goroutine,但不是唤醒第一个等待者
				runtime_Semrelease(&m.sema, false)
				return
			}
			old = m.state
		}
	} else {
        // 饥饿模式:将 mutex 所有权传递给下个等待者。
        // 注意:mutexLocked 没有设置,等待者将在被唤醒后设置它。
        // 但是如果设置了 mutexStarving,仍然认为 mutex 是锁定的,所以新来的 goroutine 不会获取到它。
		runtime_Semrelease(&m.sema, true)
	}
}

参考

  1. Data Race Detector
  2. Golang 互斥锁内部实现
  3. 信号量,锁和 golang 相关源码分析

sync.Once 源码阅读

在群里看到有人讨论 Once 的实现细节,也看了下源码,记录一下。

Once 的一般使用场景:确保初始化操作只执行一次。

直接看源码吧:

package sync

import (
	"sync/atomic"
)

type Once struct {
	done uint32
	m    Mutex
}

func (o *Once) Do(f func()) {
	// 下面是一个 Do 的错误实现:
	//
	//	if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
	//		f()
	//	}
	//
	// Do 要确保它返回时,f 已经执行完毕。
  // 而这个实现不能确保这一点。考虑这个情况:
  //   1. 当有多个 goroutine 来执行 Do 时,goroutine1 判断 o.done 为 0,
  // 将其修改为 1 后开始执行 f。
  //   2. goroutine2 判断 o.done 为 1,直接返回,此时 f 中的资源不一定初始化完毕。
  //   3. 如果此时 goroutine2 就开始使用 f 未初始化完毕的资源,可能会出现意外情况。
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f)
	}
}

// 接上面,这就是为什么 doSlow 里面使用了 mutex。考虑这个情况:
//   1. goroutine1 原子加载 o.done,此时值为 0,开始执行 doSlow。goroutine1 先获取到锁,
// 执行 f 但未返回。
//   2. goroutine2 此时也原子加载 o.done,由于 goroutine1 还未修改其值,所以还为 0。
// 也执行 doSlow,但获取不到锁,阻塞在这里。
//   3. goroutine1 执行完毕 f,返回前将 o.done 设为 1 并释放锁。
//   4. goroutine 2 获取到锁,判断 o.done 为 1,直接返回。此时 f 中的资源已初始化完毕,可以使用。
func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
    // 由于要确保 f 执行完毕后(不管成功或失败)再修改 o.done 的值,所以使用 defer。
    // 后续对 Do 的调用,都不会执行 f。
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

Kubernetes scheduler 源码阅读

Kubernetes scheduler 源码阅读

本文基于 Kubernetes 1.20 版本。

Kubernetes 调度器负责将 Pod 调度到集群内的节点上,它监听 API Server,查询还未分配 Node 的 Pod,然后根据调度策略为这些 Pod 分配节点(更新 Pod 的 NodeName 字段)。

0. 调度框架简介

调度框架是 Kubernetes Scheduler 的一种可插入架构,可以简化调度器的自定义。 它向现有的调度器增加了一组新的“插件” API。插件被编译到调度器程序中。 这些 API 允许大多数调度功能以插件的形式实现,同时使调度“核心”保持简单且可维护。

调度框架定义了一些扩展点。调度器插件注册后在一个或多个扩展点处被调用。 这些插件中的一些可以改变调度决策,而另一些仅用于提供信息。

每次调度一个 Pod 的尝试都分为两个阶段,即 调度周期绑定周期

  • 调度周期为 Pod 选择一个节点,绑定周期将该决策应用于集群。 调度周期和绑定周期一起被称为“调度上下文”。
  • 调度周期是串行运行的,而绑定周期可以并发运行。
  • 如果确定 Pod 不可调度或者存在内部错误,则可以终止调度周期或绑定周期。 Pod 将返回队列并重试。

scheduling-framework-extensions

以上简介内容来自官方文档,下面开始看源码。

Framework是一个接口,要实现此接口需要实现上面提到的各扩展点的方法:

framework 结构实现了 Framework 接口:

type framework struct {
	registry              Registry
	snapshotSharedLister  schedulerlisters.SharedLister
	waitingPods           *waitingPodsMap
	pluginNameToWeightMap map[string]int
	queueSortPlugins      []QueueSortPlugin
	preFilterPlugins      []PreFilterPlugin
	filterPlugins         []FilterPlugin
	preScorePlugins       []PreScorePlugin
	scorePlugins          []ScorePlugin
	reservePlugins        []ReservePlugin
	preBindPlugins        []PreBindPlugin
	bindPlugins           []BindPlugin
	postBindPlugins       []PostBindPlugin
	unreservePlugins      []UnreservePlugin
	permitPlugins         []PermitPlugin

	clientSet       clientset.Interface
	informerFactory informers.SharedInformerFactory
	volumeBinder    *volumebinder.VolumeBinder

	metricsRecorder *metricsRecorder

	// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
	// after the first failure.
	runAllFilters bool
}

1. 启动 kube-scheduler 进程

首先找到 scheduler 的入口:cmd/kube-scheduler/scheduler.go 中的 main 函数,代码编译后,可通过终端命令启动程序。

command := app.NewSchedulerCommand()
// ...
if err := command.Execute(); err != nil {
		os.Exit(1)
	}

进入下一个函数:cmd/kube-scheduler/app/server.go 中的 Run 函数:

Run 函数做的是基于给定的配置信息和 registryOptions 运行 scheduler,只有在出错或终端命令终止时退出:

// 因为 runCommand() 中设置了 context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

简单看一下 Scheduler 里面几个关键字段,先不必深入细节:

// 监控未调度的 Pod, 找到合适的节点, 并将绑定信息写回 API Server.
type Scheduler struct {
  // 通过 SchedulerCache 所做的变更会被 NodeLister 和 Algorithm 观察到.
	SchedulerCache internalcache.Cache
	Algorithm core.ScheduleAlgorithm
	podConditionUpdater podConditionUpdater
	
  // 用来驱逐 Pods 和更新抢占者 Pod 的 NominatedNode 字段.
	podPreemptor podPreemptor

  // 函数实现应该是阻塞的, 直到有可用的 Pod 才返回.
  // 此函数不使用 channel, 因为调度 Pod 会花一些时间, Pod 放在 channel 中可能会时间过长而 stale.
	NextPod func() *framework.PodInfo
	Error func(*framework.PodInfo, error)

	// Close this to shut down the scheduler.
	StopEverything <-chan struct{}

  // 处理 Pod PVC/PV 的绑定
	VolumeBinder *volumebinder.VolumeBinder

  // 是否禁用 Pod 抢占.
	DisablePreemption bool

  // 待调度 Pod 的队列, 后面会详细介绍
	SchedulingQueue internalqueue.SchedulingQueue

	// Profiles are the scheduling profiles.
	Profiles profile.Map
	scheduledPodsHasSynced func() bool
}

回到主线,Run 函数:

// 初始化调度框架的插件注册表
// ...
// 创建 Scheduler 结构
sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
	)
// 准备事件广播器
// 设置 healthz 检查并启动该服务
// 另起一个 goroutine 运行 informers
// ...
// 运行 scheduler
sched.Run(ctx)

sched.Run 方法开始进行 Pod 调度:

// 执行监听和调度, 等待缓存同步完毕后, 阻塞的执行调度,直到 context cancel。
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
  // Pod 队列操作, 里面起了两个 goroutine
	sched.SchedulingQueue.Run()
  // sched.scheduleOne 实现 Pod 的调度逻辑
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

下面分开看 Pod 队列的操作和 Pod 调度。

2. 优先级队列 PriorityQueue

sched.SchedulingQueue.Run() 方法将 Pod 在不同的队列中进行移动。SchedulingQueue 是一个 interface,实现此接口的队列用来存储等待调度的 Pod。

实现此接口的结构是 PriorityQueue,它主要包含三个队列:

  • activeQ:调度器从 activeQ 队列寻找 Pod 来调度,队列最前面的 Pod 的优先级最高。
  • podBackoffQ:Pod 调度失败后放入此队列,里面的 Pod 按退避到期顺序 (即重试时间) 排序。在调度器从 activeQ 寻找 Pod 之前,会从 podBackoffQ 中弹出退避完成 (即到达重试时间) 的 Pod 进行重新调度。
  • unschedulableQ:是一个 map,用来存储那些已尝试并确定无法调度的 Pod。

简单看一下数据结构:

type PriorityQueue struct {
	lock sync.RWMutex // 此结构是非线程安全的
	
  // Heap 是一个 map+slice 实现的队列.
	activeQ *heap.Heap
	podBackoffQ *heap.Heap
	unschedulableQ *UnschedulablePodsMap
  
  // nominatedPods 是一个 map, 用来存储被提名的在节点上运行的 Pod.
	nominatedPods *nominatedPodMap
  // 表示调度周期的序号, 当有一个 Pod 从 activeQ 弹出时将加 1.
	schedulingCycle int64
  // 收到一个移动请求时, 缓存调度周期的序号. 
  // 当收到移动请求时, 如果尝试去调度在这个调度周期之前和之中的不可调度的 Pod, 它们将被放回到 activeQueue.
	moveRequestCycle int64
}

实际执行的是 PriorityQueue 的 Run 方法,它负责将 Pod 在三个队列中进行移动:

// wait.Until 函数来保证 flushBackoffQCompleted 和 flushUnschedulableQLeftover 两个方法失败时会不断重试.
func (p *PriorityQueue) Run() {
  // 每 1 秒执行一次 flushBackoffQCompleted 方法, 直到收到停止信号.
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
  // 每 30 秒执行一次 flushUnschedulableQLeftover 方法, 直到收到停止信号.
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

PriorityQueue.flushBackoffQCompleted 方法做的事情是把 BackoffQ 里面到达重试时间的 Pod 放回到 activeQ:

func (p *PriorityQueue) flushBackoffQCompleted() {
	// 省略加解锁代码(什么情况下会发生锁竞争?)
  for {
    rawPodInfo := p.podBackoffQ.Peek()
    
    // ...
    // 查看队列头部的 Pod 是否到达重新调度时间 (根据尝试次数计算), 未到达则 return.
    // 这里是先查看 Pod 符合 pop 的条件后才执行真正的 pop 动作.
    // ...
    
		_, err := p.podBackoffQ.Pop()
    // 入 activeQ 队列
		p.activeQ.Add(rawPodInfo)
    // 唤醒所有等待从 activeQ 队列 pop Pod 的 goroutine (sched.scheduleOne 方法会调用 sched.NextPod() 来获取 Pod)
		defer p.cond.Broadcast()
	}
}

PriorityQueue.flushUnschedulableQLeftover 方法会把在 unschedulableQ 队列中存放时间超过 unschedulableQTimeInterval (60 秒) 的 Pod 移到 podBackoffQ 或 activeQ。

func (p *PriorityQueue) flushUnschedulableQLeftover() {
  // 加锁
	// ...
  // 遍历 map 寻找到达重是时间的 Pod 放入 podsToMove
  // ...
  
	if len(podsToMove) > 0 {
    // 主要逻辑, 正在等待 backoff 时间的 Pod 会放入 podBackoffQ, 否则放入 activeQ
    // UnschedulableTimeout 表示一个事件, 用来统计 metrics
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

接下来看 Pod 真正的调度逻辑。

3. Pod 调度

Pod 的调度逻辑在 Scheduler.scheduleOne 方法中实现。大体思路是先找到合适的节点,缓存必要信息,假定 Pod 已经运行在该节点上,真正的绑定操作是异步进行的。

看一下它的调度逻辑:

  1. 从队列中获取一个待调度的 Pod。

  2. 获取此 Pod 所属调度器的 Profile,包括根据给定的配置创建的 Framework:

    // pkg/scheduler/profile/profile.go
    type Profile struct {
    	framework.Framework
    	Recorder events.EventRecorder
    }
    
    // pkg/scheduler/apis/config/types.go
    type KubeSchedulerProfile struct {
    	SchedulerName string
      // 调度器要用到的插件
    	Plugins *Plugins
    	PluginConfig []PluginConfig
    }
  3. 创建一个 CycleState 结构供插件读写数据,各插件的数据可以互相读写。

    // pkg/scheduler/framework/v1alpha1/cycle_state.go
    type CycleState struct {
    	mx      sync.RWMutex
    	storage map[StateKey]StateData
    	// if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle.
    	recordPluginMetrics bool
    }
  4. 调用 sched.Algorithm.Schedule() 方法,经过调用一系列插件的过滤及打分,得到一个符合的节点。

    • 运行 Prefilter 插件集。
    • 调用 findNodesThatFitPod() 方法,经过 Filter 和 Extender 的过滤,得到符合条件的节点列表。
    • 运行 Prescore 插件集,都成功后进行后续逻辑。
    • 调用 prioritizeNodes() 方法执行 Score 插件集,也可以运行任何的 extender。
      • 每个插件的分数加在一起,就是一个节点的总分。
      • 返回 NodeScore 列表。
    • 选择一个得分最高的节点。
  5. 复制一份 Pod 信息,假定该 Pod 已经运行在选定的节点上,即使还没有绑定它们。这样调度器可以继续调度其它 Pod,而无需等待绑定操作完成(绑定操作是异步进行的)。

  6. 调用 AssumePodVolumes() 方法缓存 Pod 的节点选择。如果需要 PVC 绑定,则只在内存中缓存。

    • AssumePodVolumes 将把缓存的匹配 PV 和 PVC 提供给 podBindingCache 中的所选节点。
    • 用新的预绑定 PV 更新 pvCache。
    • 用新的带 annotations 集合的 PVC 更新 pvcCache。
    • 用为 PV 和 PVC 缓存的 API 更新再次更新 podBindingCache 。
  7. 运行 Reserve 插件。

  8. 调用 assume 方法来把 assumedPod 缓存起来,缓存前它会设置 assumed.Spec.NodeName = scheduleResult.SuggestedHost,即所谓的 Pod 绑定信息。

  9. 调用 RunPermitPlugins() 方法运行 Permit 插件集。Permit 插件用于防止或延迟 Pod 的绑定。

    • 如果返回的不是 Success 或 Wait,将不会继续执行剩下的 Permit 插件并返回错误。
    • 如果有任一插件返回的是 Wait,在所有插件运行完后,会创建一个 waitingPod(已经开始) 并将其放入 waitingPods map 里(后面会用到),随后返回。
    • 如果都返回 Success,则继续后面的异步绑定操作。
  10. 起一个 goroutine 进行绑定:

    • 执行 WaitOnPermit 方法在 Permit 阶段等待 Pod。
    • 绑定 Volumes。
      • 它使用前面假定的绑定更新 API,并等待 PV 控制器完成绑定操作。
      • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 prebind 插件。同上面一样,失败后会触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 bind 操作。
      • 绑定的优先级:先执行 extender 再执行 framework 的 Bind 插件。其实就是把 Pod 与 Node 的绑定信息发送给 API Server 处理。
      • 绑定成功后,会调用 finishBinding() 方法使缓存的 Pod 过期。
    • 如果绑定失败,触发 un-reserve 插件来清除 Reserved Pod 的相关状态。
    • 执行 Postbind 插件。

至此,调度一个 Pod 的逻辑就梳理完毕了,不过,还有一些细节需要再梳理。

4. 参考

  1. 官方文档

解决虚拟机无法下载 k8s.gcr.io 镜像的问题

解决虚拟机无法下载 k8s.gcr.io 镜像的问题

背景

两种方式:

  • 有代理的宿主机先下载好镜像再 scp 到虚拟机 OS 中。
  • 给虚拟机 OS 中的 docker 添加代理。

环境:

  • 宿主机系统:macOS 10.15.7
  • 虚拟机软件:VirtualBox
  • 虚拟机系统:CentOS 7

问题:

在虚拟机上安装 ingress-nginx-controller 时,状态一直是 ImagePullBackOff。

查看该 Pod 用到的镜像:

> kubectl get pod ingress-nginx-controller-67897c9494-zjf9r -n ingress-nginx -o yaml | grep image
docker pull k8s.gcr.io/ingress-nginx/controller:v0.44.0@sha256:3dd0fac48073beaca2d67a78c746c7593f9c575168a17139a9955a82c63c4b9a

1. 方法一

在宿主机下载好镜像,依次保存,上传到虚拟机,加载,一通操作后发现加载好的镜像的 REPOSITORY 和 TAG 都是 none,这还行?查了下发现是因为 save 镜像时是使用的 docker save <image id> 命令,它会清除 REPOSITORY 和 TAG 的值,改为使用 docker save <repo>:<tag> 即可。

错误示范:

docker save -o 435df390f367 ingress-nginx-controller.tar

以下是没问题的操作。

1.1. 宿主机下载镜像

宿主机有梯子的话,这步应该没什么问题,直接 pull 即可。

docker pull k8s.gcr.io/ingress-nginx/controller:v0.44.0@sha256:3dd0fac48073beaca2d67a78c746c7593f9c575168a17139a9955a82c63c4b9a

不过下载好后的镜像没有 tag,又打了个 tag:

docker tag 435df390f367 k8s.gcr.io/ingress-nginx/controller:v0.44.0

1.2. 保存镜像

保存镜像:

docker save -o ingress-nginx-controller.tar k8s.gcr.io/ingress-nginx/controller:v0.44.0

1.3. 上传、加载镜像

上传镜像:

scp ingress-nginx-controller.tar [email protected]:/root/workspace

加载镜像:

docker load -i ingress-nginx-controller.tar

查看 Docker 镜像,一切正常。

注意:别忘了每个节点上都要上传镜像。

1.4. 查看 Kubernetes 运行情况

查看 Pod ingress-nginx-controller 的状态,依然是 ImagePullBackOff...

镜像的 TAG 也和之前查看的结果一致,都是 v0.44.0。难道是后面的 sha256 值不一样?

查看虚拟机里面镜像 k8s.gcr.io/ingress-nginx/controller 的 sha256 值:

docker inspect 435df390f367

果然和最开始查到的不一致。

修改 Deployment ingress-nginx-controller 用到的镜像,删除镜像后面的 sha256 值:

kubectl edit deployment ingress-nginx-controller -n ingress-nginx

现在再查看 Pod 状态,已经开始正常运行了。

2. 方式二

前提:

  • VirtualBox 的网络模式为桥接模式。
  • 宿主机代理软件的 允许局域网连接 已打开。

在执行docker pull时,是由守护进程dockerd来执行。因此,代理需要配在dockerd的环境中。而这个环境,则是受systemd所管控,因此实际是systemd的配置。

创建如下文件:

sudo mkdir -p /etc/systemd/system/docker.service.d
sudo touch /etc/systemd/system/docker.service.d/proxy.conf

在 proxy.conf 中添加以下内容:

[Service]
Environment="HTTP_PROXY=http://proxy.example.com:8080/"
Environment="HTTPS_PROXY=http://proxy.example.com:8080/"

重载 systemd 并重启 dockerd

sudo systemctl daemon-reload
sudo systemctl restart docker

测试效果:

> docker pull k8s.gcr.io/kube-proxy:v1.19.2
> docker images

验证有效。

使用 docker-compose 部署 MongoDB 副本集

1. 副本集概述

某些情况下,副本可以提供更高的读取容量,就像客户端可以发送读操作到不同的服务器。在不同数据中心维护数据副本可以增加分布式应用的数据局部性和可用性。还可以因为其它目的保存额外的副本,比如灾难恢复、报告或备份。

MongoDB 中的副本

  1. 一个副本集就是一组维护相同数据集的 mongod 实例。

  2. 一个数据集包含一些数据承载节点和一个可选的仲裁节点。数据承载节点中,有且只能有一个被认为是主承载节点,而其它节点被认为是次要节点。主节点接收所有写入操作。主节点将对其数据集所做的所有更改记录到其 oplog。

  3. 次要节点复制主节点的 oplog 并将操作应用到其数据集,就跟次要数据集反映了主数据集一样。如果主节点不可用,一个合格的次要节点将被选为新的主节点。

  4. 可以添加一个额外的 mongod 实例到副本集中来作为仲裁节点。仲裁节点不维护数据集。仲裁节点通过响应副本集其它成员的心跳和选举请求来达到维护副本集中法定成员数量的目的。因为它不需要存储数据集,比带有数据集的全功能副本集成员消耗更少的资源,所以它是一种提供副本集仲裁功能比较好的方式。如果你的副本集有偶数个成员,添加一个仲裁节点在主节点选举中来获得一个主节点的选票。仲裁节点不需要专用硬件。

  5. 仲裁节点将永远是仲裁节点,但主节点可能变为次要节点,次要节点也可能通过选举成为主要节点。

异步复制

  1. 次要节点异步的应用来自主节点的操作。通过在主节点之后应用操作,副本集可以不管一个或多个成员的失败而继续实现其功能。

自动故障切换

  1. 当主节点超过十秒没有与副本集的其它成员通信,一个合格的次要节点将举行选举来将它自己选举为新的主节点。第一个次要节点举行选举,并在接收多数成员的投票后成为主节点。
  2. MongoDB 3.2 版本后引入了版本 1 的复制协议,它减少了副本集的故障切换时间并加速了多个同时存在的主节点的检测。
  3. 虽然时间不同,故障转移过程一般在一分钟之内完成。

读操作

  1. 默认的,客户端从主节点进行读操作,然而可以通过指定读偏好来将读操作发送到次要节点。次要节点的异步复制意味着从次要节点进行的读操作可能返回没有反映主节点上数据状态的数据。
  2. MongoDB 中,客户端可以在写操作持久化之前看到其结果。

额外的特性

副本集提供多种支持应用需求的选项。可以部署成员在多个数据中心的副本集,或通过调整一些成员的 members[n].priority 来控制选举主节点的结果。副本集也支持专用于报告、灾难恢复或备份功能的成员。

副本集成员

  1. 主节点接收所有写操作。
  2. 次要节点从主节点复制操作来保持与主节点相同的数据集。次要节点可以有特殊配置文件的附加配置。比如,次要节点可以对选举弃权或优先级为 0。
  3. 副本集的推荐最低配置是带有三个数据承载点三成员副本集:一个主节点和两个次要节点。也可以部署为两个数据承载点的三成员副本集:一个主节点,一个次要节点和一个仲裁节点。但这种模式不如三数据承载点的模式冗余性好。
  4. 3.0 版本的变化:副本集最多可以有 50 名成员,但只有 7 名投票成员。

主节点

  1. 主节点是副本集中接收写操作的唯一成员。MongoDB 在主节点上应用写操作,然后将操作记录在主节点的 oplog 上。次要节点复制 oplog 并将操作应用到它们的数据集上。

次要节点

  1. 次要节点维护主节点数据集的一个副本。复制数据时,次要节点在异步过程中从主节点的 oplog 应用操作到它自己的数据集。副本集可以有一个或多个次要节点。
  2. 次要节点可以通过配置来达到实现一些效果:
    • 防止被选举为主节点,以便允许它留在次要节点数据中心或作为冷备用。
    • 防止应用从中读取,以便允许它运行需要与正常流量分离的应用。
    • 保存一个运行的历史快照,以便从某些错误中恢复,如误删数据库。

仲裁节点

  1. 仲裁节点没有数据集的副本,并且不能变成主节点。副本集可以有仲裁节点来在选举主节点中投票。仲裁节点永远有一张选票,这样就允许副本集的投票成员数量不均等,而无需承担一个复制数据的成员的额外开销。
  2. 重要:不要在还承载副本集的主节点或次要节点的系统上运行仲裁节点。

副本集 oplog

  1. oplog:操作日志,一个特殊的固定集合,保持所有修改数据库上数据的操作的滚动记录。
  2. 副本集的所有成员都包含一份 oplog 的副本,在 local.oplog.rs 中,这允许它们维护数据库的当前状态。
  3. 为了减轻复制的困难,所有的副本集成员成员都发送心跳(ping)到所有其它成员。
  4. 任何成员都可以从其它成员那里导入 oplog。
  5. oplog 中的每个操作都是幂等(idempotent)的。即,oplog 对目标资料应用不管一次或多次操作,都产生相同的结果。

oplog 大小

  1. 当第一次开始一个副本集成员时,MongoDB 以默认大小创建一个 oplog。
  2. 如果可以预料副本集工作量的大小,可以将 oplog 设置为比默认值大些。相反,如果应用主要用来执行读操作和少量写操作,一个小的 oplog 可能就足够了。
  3. 以下情况可能需要较大的 oplog:
    • 一次更新多个文档
    • 删除与插入量大致相同的数据时
    • 大量的就地更新

oplog 状态

  1. 使用 rs.printReplicationInfo() 方法来查看 oplog 状态,包括其大小和操作的时间范围。

2. 在 docker 中创建副本集

准备工作

  1. 安装 docker

  2. 下载 mongo 镜像,如有需求可加上版本号

    docker pull mongo

概览

  1. 我们要从 mongo 镜像创建三个容器,都处在它们的 docker 容器网络内。
  2. 这三个容器将被命名为 mongo1、mongo2 和 mongo3。它们将作为副本集的三个 mongo 实例。
  3. 暴露它们的端口到本地机器,以便可以从本地机器的 mongo shell 来访问它们中的任意一个。
  4. 这三个 mongo 容器中的每一个都应该能与这个网络中的所有其它容器通信。

建立网络

  1. 创建一个名为 my-mongo-cluster 的网络:

    docker network create my-mongo-cluster
  2. 查看当前系统中的网络:

    docker network ls

创建容器

运行以下命令启动第一个容器:

docker run \
-p 30001:27017 \
--name mongo1 \
--net my-mongo-cluster \
mongo mongod --replSet my-mongo-set
  1. docker run: 从镜像启动容器
  2. -p 30001:27017:暴露容器中的 27017 端口,映射到本机的 30001 端口
  3. --name mongo1:给这个容器命名为 mongo1
  4. --net my-mongo-cluster:将此容器添加到 my-mongo-cluster 网络
  5. mongo:用来生成此容器的镜像名称
  6. mongod --replSet my-mongo-set:执行 mongod 命令,以将此 mongo 实例添加到名称为 my-mongo-se 的副本集。

启动其余两个容器:

docker run \
-p 30002:27017 \
--name mongo2 \
--net my-mongo-cluster \
mongo mongod --replSet my-mongo-set

docker run \
-p 30003:27017 \
--name mongo3 \
--net my-mongo-cluster \
mongo mongod --replSet my-mongo-set

配置副本集

现在我们需要的 mongo 实例已经运行起来了,现在来把它们编程副本集。

  1. 这条命令将在运行的容器 mongo1 中打开 mongo shell:

    docker exec -it mongo1 mongo
  2. 在 mongo shell 中进行配置:

    > db = (new Mongo('localhost:27017')).getDB('test')
    test
    > config = {
      	"_id" : "my-mongo-set",
      	"members" : [
      		{
      			"_id" : 0,
      			"host" : "mongo1:27017"
      		},
      		{
      			"_id" : 1,
      			"host" : "mongo2:27017"
      		},
      		{
      			"_id" : 2,
      			"host" : "mongo3:27017"
      		}
      	]
      }
    • 第一个 _id 键应当和 —replSet 标签为 mongo 实例设置的值一样,这个例子中是 my-mongo-set。
    • 接下来列出了所有想放到副本集中的成员。
    • 将所有 mongo 实例添加到 docker 网络。在 my-mongo-cluster 网络中根据每个容器的名称各自分配到 ip 地址。
  3. 通过以下命令启动副本集:

    rs.initiate(config)

    如果一切顺利,提示符将变成这样:

    my-mongo-set:PRIMARY>

    这意味着 shell 现在与 my-mongo-set 集群中的 PRIMARY 数据库进行了关联。

  4. 下面测试副本集是否工作,先在 primary 数据库中插入数据:

    > db.mycollection.insert({name : 'sample'})
    WriteResult({ "nInserted" : 1 })
    > db.mycollection.find()
    { "_id" : ObjectId("57761827767433de37ff95ee"), "name" : "sample" }
  5. 然后新建一个与 secondary 数据库的连接,并测试文档是否在那里复制:

    > db2 = (new Mongo('mongo2:27017')).getDB('test')
    test
    > db2.setSlaveOk()
    > db2.mycollection.find()
    { "_id" : ObjectId("57761827767433de37ff95ee"), "name" : "sample" }

    执行 db2.setSlaveOk() 命令来让 shell 知道我们故意在查询非 primary 的数据库。

3. 使用 Dockerfile 构建副本集镜像

mongo 副本集基础镜像

  1. Dockerfile

    FROM ubuntu:14.04
    MAINTAINER Jia chenhui
    ENV REFRESHED_AT 2017-0726
    
    RUN sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 0C49F3730359A14518585931BC711F9BA15703C6
    RUN echo "deb [ arch=amd64 ] http://repo.mongodb.org/apt/ubuntu trusty/mongodb-org/3.4 multiverse" | sudo tee /etc/apt/sources.list.d/mongodb-org-3.4.list
    RUN sudo apt-get update
    RUN sudo apt-get install -y mongodb
    
    EXPOSE 27017
    CMD []
  2. 构建基础镜像

    docker build -t mongo_set_base .

mongo 副本集镜像

  1. Dockerfile

    FROM mongo_set_base
    MAINTAINER Jia chenhui
    ENV REFRESHED_AT 2017-07-26
    
    RUN mkdir -p /data/db
    
    ENTRYPOINT [ "usr/bin/mongod", "--replSet", "my-mongo-set" ] 
  2. 构建副本集镜像

    docker build -t mongod .
  3. 从镜像 mongod 启动容器 mongo1

    docker run \
    -p 30001:27017 \
    --name mongo1 \
    --net my_mongo_cluster \
    mongod
  4. 顺利的话会看到提示

    [rsStart] replSet can't get local.system.replset config from self or any seed (EMPTYCONFIG)

    这是因为还没有配置副本集,继续往下执行

mongo 副本集次要节点

从镜像 mongod 启动容器,这里启动两个容器 mongo2 和 mongo3,作为副本集次要节点

docker run \
-p 30002:27017 \
--name mongo2 \
--net my_mongo_cluster \
mongod

docker run \
-p 30003:27017 \
--name mongo3 \
--net my_mongo_cluster \
mongod

配置副本集 config

  1. 以交互模式进入主节点的 shell(可以连接任一容器)

    docker exec -it mongo1 mongo
  2. 执行以下命令

    db = (new Mongo('localhost:27017')).getDB('test')
    
    config = {
        "_id" : "my-mongo-set",
        "members" : [
            {
                "_id" : 0,
                "host" : "mongo1:27017"
            },
            {
                "_id" : 1,
                "host" : "mongo2:27017"
            },
            {
                "_id" : 2,
                "host" : "mongo3:27017"
            }
        ]
    }
    
    rs.initiate(config)
  3. 如果一切正常,会出现以下提示,表示副本集已创建成功,目前在主节点中

    my-mongo-set:PRIMARY>

测试副本集

  1. 插入一条文档

    db.mycollection.insert({name : 'sample'})
  2. 查看是否成功插入

    db.mycollection.find()
  3. 另开一个 terminal 连接次要节点,查询文档是否与主节点插入的文档一致

    docker exec -it mongo2 mongo
    db2 = (new Mongo('mongo2:27017')).getDB('test')
    
    db2.setSlaveOk()
    
    db2.mycollection.find()
    

    到这里就完成了一个主节点两个次要节点副本集的创建。

4. 使用 docker-compose 构建副本集服务

Dockerfile

FROM mongo:3.4.1
MAINTAINER Jia Chenhui
ENV REFRESHED_AT 2017-07-27

ADD . /replicaset
WORKDIR /replicaset

RUN mkdir -p /replicaset/data/db

EXPOSE 27017
ENTRYPOINT [ "mongod", "--replSet", "my-mongo-set" ]

compose.yml

version: '2'
services:
    primary:
        build: .
        image: mongoset:primary
        ports:
            - "30001:27017"
        volumes:
            - .:/replicaset
        networks:
            - my_mongo_cluster

    replica1:
        build: .
        image: mongoset:replica1
        ports:
            - "30002:27017"
        volumes:
            - .:/replicaset
        networks:
            - my_mongo_cluster

    replica2:
        build: .
        image: mongoset:replica2
        ports:
            - "30003:27017"
        volumes:
            - .:/replicaset
        networks:
            - my_mongo_cluster

networks:
    my_mongo_cluster:

创建服务

docker-compose up

进入 mongo shell 配置并初始化副本集

config = {
    "_id" : "my-mongo-set",
    "members" : [
        {
            "_id" : 0,
            "host" : "compose_primary_1:27017"
        },
        {
            "_id" : 1,
            "host" : "compose_replica1_1:27017"
        },
        {
            "_id" : 2,
            "host" : "compose_replica2_1:27017"
        }
    ]
}

rs.initiate(config)

使用 mgo 已经可以正常插入和查询数据

在 mongo shell 中可以参照 “2. 在 docker 中创建副本集” 相关步骤

Go 代码评审意见

此文为翻译文章,这是原文地址

此页面收集在审核 Go 代码期间所做的常见注释,以便可以用缩写来指代详细的解释。这是一长串常见的错误,不是一个全面的风格指南。

你可以把这当作是 Effective Go 的补充。

  • Gofmt
  • [Comment Sentences](#Comment Sentences)
  • Contexts
  • Copying
  • [Declaring Empty Slices](#Declaring Empty Slices)
  • [Crypto Rand](#Crypto Rand)
  • [Doc Comments](#Doc Comments)
  • [Dont't Panic](#Don't Panic)
  • [Error Strings](#Error Strings)
  • Examples
  • [Goroutine Lifetimes](#Goroutine Lifetimes)
  • [Handle Errors](#Handle Errors)
  • Imports
  • [Import Dot](#Import Dot)
  • [In-Bank Errors](#In-Band Errors)
  • [Indent Error Flow](#Indent Error Flow)
  • Initialisms
  • Interfaces
  • [Line Length](#Line Length)
  • [Mixed Caps](#Mixed Caps)
  • [Named Result Parameters](#Named Result Parameters)
  • [Naked Returns](#Naked Returns)
  • [Package Comments](#Package Comments)
  • [Package Names](#Package Names)
  • [Pass Values](#Pass Values)
  • [Receiver Names](#Receiver Names)
  • [Receiver Type](#Receiver Type)
  • [Synchronous Functions](#Synchronous Functions)
  • [Useful Test Failures](#Useful Test Failures)
  • [Variable Names](#Variable Names)

Gofmt

在你的代码中运行 gofmt 来自动修复大多数机械样式的问题。几乎所有的 Go 代码都在使用 gofmt。本文档的其余部分将讨论非机械式的点。

另一种选择是使用 goimportsgofmt 的超集,它根据需要添加(和删除)导入行。

Comment Sentences

查看 https://golang.org/doc/effective_go.html#commentary。注释记录声明应该是完整的句子,即使这看起来有点多余。这种方法使它们在被提取到 godoc 文档时有良好的风格。注释应当以被描述的事物的名称开始,以句号结尾:

// Request represents a request to run a command.
type Request struct { ...

// Encode writes the JSON encoding of req to w.
func Encode(w io.Writer, req *Request) { ...

诸如此类。

Contexts

context.Context 类型的值携带安全证书,跟踪信息,截止期限,和跨 API 和进程边界的取消信号。Go 程序在整个函数调用链中显式传递 Context,从传入的 RPC 和 HTTP 请求到传出的请求。

大多数使用 Context 的函数应当以它作为它们的第一个参数:

func F(ctx context.Context, /* other arguments */) {}

一个从不特定于请求的函数可以使用 context.Background(),但即使你认为不需要,也可以用 Context 来传递 err。默认情况是传递一个 Context,如果你有足够的理由说明另一个选择是错误的,也可以直接使用 context.Background()

不要向 struct 类型添加 Context 成员,而是给该类型中每个需要传递 Context 的方法添加一个 ctx 参数。一个例外是某个方法的签名必须与标准库或第三方库中的接口匹配。

不要在函数签名中创建自定义的 Context 类型或使用 Context 以外的接口。

如果你需要传递应用程序数据,把它放到一个参数中,在 receiver 中,全局变量中,或者它真的属于这里,放到一个 Context 值中。

Context 是不可变的,因此将相同的 ctx 传递给多个分享相同的截止期限、取消信号、证书和 parent 追踪等内容的调用是没有问题的。

Copying

为避免意外的别名使用,从另一个包拷贝 struct 时要小心。例如,bytes.Buffer 类型包含 []byte 切片,并且作为小字符串的优化,一个切片可以引用的小字节数组。如果你拷贝一个 Buffer,副本中的切片可能是原始数组的别名,导致随后的方法调用产生惊人的效果。

通常,如果其方法与指针类型 * T 相关联,则不要复制类型 T 的值。

Declaring Empty Slices

当声明一个空切片时,更推荐:

var t []string

而不是:

t := []string{}

前者声明一个 nil 切片值,而后者为非 nil 值,但长度却为 0。它们在功能上是等价的 —— 它们的 lencap 都是 0 —— 但 nil 切片是首选风格。

注意,在有限的情况下,非 nil 但长度为 0 的切片是首选的,比如在编码 JSON 对象时(一个 nil 切片编码为 null,而 []string{} 编码为 JSON 数组 [])。

当设计接口时,避免区分 nil 切片和非 nil 的 0 长度切片,因为这可能导致微妙的编程错误。

更多关于 Go 中 nil 的讨论请参见 Francesc Campoy 的演讲:Understanding Nil

Crypto Rand

不要使用包 math/rand 来生成键,即使是一次性的。未加 seed 的,生成器是完全可预测的。用 time.Nanoseconds() 进行 seed,只有少量的熵。

改为使用 crypto/rand 的 Reader,并且如果你需要文字,打印为十六进制或 base64:

import (
    "crypto/rand"
    // "encoding/base64"
    // "encoding/hex"
    "fmt"
)

func Key() string {
    buf := make([]byte, 16)
    _, err := rand.Read(buf)
    if err != nil {
        panic(err)  // out of randomness, should never happen
    }
    return fmt.Sprintf("%x", buf)
    // or hex.EncodeToString(buf)
    // or base64.StdEncoding.EncodeToString(buf)
}

Doc Comments

所有顶层、导出的名称都应该有 doc 注释,重要的的非导出类型或函数声明也是如此。在 https://golang.org/doc/effective_go.html#commentary 查看有关注释约定的更多信息。

Don't Panic

查看 https://golang.org/doc/effective_go.html#errors。不要在正常的错误处理中使用 panic,使用 error 和多返回值。

Error Strings

错误字符串不应该大写(除非以专有名词或首字母缩写开头)或者以标点符号结束,因为它们通常是按照其它上下文打印的。也就是说,使用 fmt.Errorf("something bad") 而不是 fmt.Errorf("Something bad"),因此 log.Printf("Reading %s: %v", filename, err) 格式的信息中间没有假的大写字母。这不适用于日志记录,它隐式的面向行,而不是在其它消息中组合。

Examples

当添加一个新包时,包括预期使用的例子:一个可运行的例子,或一个演示完整调用序列的简单测试。

阅读更多关于 testable Example() functions

Goroutine Lifetimes

当你创建 goroutine 时,明确它们何时(或是否)退出。

goroutine 可以通过阻塞在 channel 读或者写上而发生泄漏:GC 将不会终止 goroutine,即使阻塞它的 channel 是不可达的。

即使 goroutine 没有泄漏,在它们不再被需要时而放任不管,也会导致其它微妙且不易诊断的问题。往关闭了的 channel 上发送会导致 panic。在 “不需要结果” 之后修改仍在使用的输入仍然可能导致数据竞争。 并且将 goroutines 放任不管任意长时间会导致不可预测的内存使用。

尽量保持并发代码尽量简单,使 goroutine 的生命期很明显。如果这不可行,记录 goroutine 退出的时间和原因。

Handle Errors

查看 https://golang.org/doc/effective_go.html#errors。不要使用 _ 变量丢弃错误。如果函数返回一个错误,检查它以确保函数成功。处理错误,返回错误,或者,在真正特殊的情况下,panic。

Imports

避免重命名导入,除非是为了避免名称冲突,好的包名不需要重命名。万一发生冲突,最好重命名最本地化或特定于项目的导入。

导入按组组织,它们之间留有空白行。标准库的包总是在第一组中:

package main

import (
	"fmt"
	"hash/adler32"
	"os"

	"appengine/foo"
	"appengine/user"

    "github.com/foo/bar"
	"rsc.io/goversion/version"
)

goimports 会为你做这件事。

ImportBlank

仅因为其副作用而导入的包(使用语法 import _ "pkg")应该只在程序的 main 包中导入,或者在需要它们的测试中。

Import Dot

由于循环依赖关系,导入 . 的形式在测试中有用,不能成为测试包的一部分:

package foo_test

import (
	"bar/testutil" // also imports "foo"
	. "foo"
)

在这种情况下,测试文件不能在包 foo 中,因为它使用 bar/testutil,而 bar/testutil 导入了 foo。因此我们使用 import . 形式来让文件假装是包 foo 的一部分,即使它不是。除了这个例子,不要在你的程序中使用 import . 形式的导入。它使程序难于阅读,因为不清楚像 Quux 这样的名称是当前包中的一个顶级标识符还是导入的包中的。

In-Band Errors

在 C 语言和类似的语言中,函数通常会返回 -1 或 null 这样的值来发出错误或遗漏结果的信号:

// Lookup returns the value for key or "" if there is no mapping for key.
func Lookup(key string) string

// Failing to check a for an in-band error value can lead to bugs:
Parse(Lookup(key))  // returns "parse failure for value" instead of "no value for key"

Go 的支持多个返回值提供了更好地解决方案。而不是要求客户端检查 In-Band 错误值,函数应该返回额外的一个值来表明它其它的返回值是否有效。这个返回值可以是一个 error,或在没必要解释时返回一个布尔值。它应该是最后一个返回值。

// Lookup returns the value for key or ok=false if there is no mapping for key.
func Lookup(key string) (value string, ok bool)

这可以防止调用者错误的使用结果:

Parse(Lookup(key))  // compile-time error

并且鼓励更健壮和更可读的代码:

value, ok := Lookup(key)
if !ok  {
    return fmt.Errorf("no value for %q", key)
}
return Parse(value)

此规则适用于导出的函数,但对非导出的函数也是有用的。

当返回值如 nil,“”,0 和 - 1 这样在它们是函数的有效返回结果时,是可以接受的,即调用者不需要以不同于其它值的方式处理它们。

一些标准库函数,比如那些在包 strings 中的,返回 In-Band error 值。这极大的简化了字符串操作代码,代价是需要程序员进行更多的努力。通常来说,Go 代码应当为 error 返回额外的值。

Indent Error Flow

尽量将正常的代码路径保持在最小的缩进,并缩进错误处理,首先处理它。这通过允许快速扫描正常路径提高了代码可读性。例如,不要这样写:

if err != nil {
	// error handling
} else {
	// normal code
}

而要这样写:

if err != nil {
	// error handling
	return // or continue, etc.
}
// normal code

如果 if 语句有一个初始化语句,比如这样:

if x, err := f(); err != nil {
	// error handling
	return
} else {
	// use x
}

这可能需要将简短的变量声明移动到它们自己单独一行:

x, err := f()
if err != nil {
	// error handling
	return
}
// use x

Initialisms

名称中的首字母或首字母缩写词(例如 "URL" 或 "NATO")有一个一致的案例。例如,URL 应该表现为 URLurl(像在 urlPonyURLPony),永远不要用 Url。作为一个例子:写成 ServeHTTP 而不是 ServeHttp。用于具有多个初始化 “单词” 的标识符,使用例如 xmlHTTPRequestXMLHTTPRequest

这条规则也适用于 ID,当它是 identifier 的缩写时(几乎所有情况下都不是

这个规则也适用于 ID,当它是 identifier 的缩写时(几乎所有情况都不是 “ego”,“superego” 中的 “id”),所以写 appID 而不是 appId

通过 protocol buffer 编译器生成的代码不受此规则的约束。人类写的代码比机器写的代码有更高的标准。

Interfaces

Go 接口通常属于使用接口类型值的包,而不是实现这些值的包。实现包应该返回具体(通常是指针或结构体)类型:这样,可以将新方法添加到实现中,而无需进行大量重构。

不要在 API 的实现者端定义接口 “for mocking”;相反,设计 API 以便可以使用真实实现的公共 API 进行测试。

在使用接口前不要定义接口:没有实际使用的例子,甚至很难看出是否需要接口,更不用说它应该包含什么方法了。

package consumer  // consumer.go

type Thinger interface { Thing() bool }

func Foo(t Thinger) string { … }
package consumer // consumer_test.go

type fakeThinger struct{ … }
func (t fakeThinger) Thing() bool { … }
…
if Foo(fakeThinger{…}) == "x" { … }
// DO NOT DO IT!!!
package producer

type Thinger interface { Thing() bool }

type defaultThinger struct{ … }
func (t defaultThinger) Thing() bool { … }

func NewThinger() Thinger { return defaultThinger{ … } }

相反,返回一个具体类型,让使用者模拟生产者实现。

package producer

type Thinger struct{ … }
func (t Thinger) Thing() bool { … }

func NewThinger() Thinger { return Thinger{ … } }

Line Length

Go 代码中没有严格的行长度限制,但是要避免让人不舒服的长行。同样,当长行的可读性更好时,不要为了保持行较短来添加换行符 —— 例如,如果它们是重复的。

大多数时候,当人们用“不正常”的方式包装行时(在函数调用或函数声明的中间,可以说,或多或少有些例外),如果它们有合理数量的参数和合理简短的变量名时包装就没有必要了。长行似乎与长名称有关,摆脱长名称有很大帮助。

换句话说,可以因为要编写的内容的语义而换行(一般来说),而不是因为行的长度。如果你发现这会产生太长的行,那么更改名称或语义可能会得到比较好的结果。

实际上,这与关于函数应该有多长的建议完全相同。没有 “永远不会有超过 N 行的函数” 这样的规则,但是程序中肯定会存在行数太多,功能过于微弱的函数,而解决方案是改变这个函数边界的位置,而不是执着在行数上。

Mixed Caps

查看 https://golang.org/doc/effective_go.html#mixed-caps。即使它违反了其它语言中的约定,这也适用。例如未导出的常量是 maxLength 而不是 MaxLengthMAX_LENGTH

也可以查看 Initialisms

Named Result Parameters

想想在 godoc 中会是什么样子。指定的结果参数如下:

func (n *Node) Parent1() (node *Node)
func (n *Node) Parent2() (node *Node, err error)

在 godoc 中会造成口吃(stutter),最好这样使用:

func (n *Node) Parent1() *Node
func (n *Node) Parent2() (*Node, error)

另一方面,如果一个函数返回两个或三个相同类型的参数,或者,如果结果的含义从上下文看不清楚,那在某些上下文中添加名称可能会很有用。不要仅仅为了避免在函数内部用 var 声明变量就给结果参数命名;这是以不必要的 API 冗长为代价,换取了少量的实现简洁性。

func (f *Foo) Location() (float64, float64, error)

不如这样清晰:

// Location returns f's latitude and longitude.
// Negative values mean south and west, respectively.
func (f *Foo) Location() (lat, long float64, err error)

如果函数行数较少,那裸返回是可以的。一旦它是一个中等规模的函数,要明确返回值。推论:仅仅因为它使得能够使用裸返回就给结果参数命名是不值得的。文档的清晰性永远比在函数中节省一两行代码要重要。

最后,在某些情况下需要为结果参数命名,以便在延迟闭包中更改它。这总是可以的。

Naked Returns

查看 [Named Result Parameters](# Named Result Parameters)。

Package Comments

包的注释,就像 godoc 提供的所有注释一样,必须出现在 package 子句旁边,没有空行。

// Package math provides basic constants and mathematical functions.
package math
/*
Package template implements data-driven templates for generating textual
output such as HTML.
....
*/
package template

对于 main 包的注释,其它类型的注释可以放在二进制名称之后(并且如果它是第一个,就可以大写),例如,对于在目录 seedgen 中的 package main 你可以这样写:

// Binary seedgen ...
package main

或这样

// Command seedgen ...
package main

或这样

// Program seedgen ...
package main

或这样

// The seedgen command ...
package main

或这样

// The seedgen command ...
package main

或这样

// Seedgen ..
package main

这些是举例,它们的合理变体是可以接受的。

注意,以小写单词开头的句子不属于包注释的可接受选项,因为这些都是公开可见的,应该用正确的英语写,包括将句子的第一个单词大写。当二进制名称是第一个单词时,即使它与命令行调用的拼写不完全匹配,也需要将其大写。在 https://golang.org/doc/effective_go.html#commentary 查看有关注释约定的更多信息。

Package Names

包中名称的所有引用都将使用包名完成,因此你可以从标识符中省略该名称。例如,如果在包 chubby 中,不应该把类型名称定义为 ChubbyFile,否则使用者将写成 chubby.ChubbyFile。而是应该把类型命名为 File,使用者将写成 chubby.File。避免无意义的包名,如 util,common,misc,api,types,和 interfaces。

http://golang.org/doc/effective_go.html#package-nameshttp://blog.golang.org/package-names 查看更多。

Pass Values

不要仅仅为了节省几个字节而将指针作为函数参数传递。如果函数从始至终只是将它的参数 x 表示为 *x,那么参数就不应该是指针。常见的实例包括传递一个字符串的指针 *string,或一个接口值的指针,如 *io.Reader。在这两种情况下,值本身都是固定大小,可以直接传递。这个建议不适用于大型结构体,或即使很小但可能增长的结构体。

Receiver Names

一个方法的 receiver 的名称应该是它身份的反映;通常一个或两个字母的缩写就够了(比如把 "Client" 缩写为 "c" 或 "cl")。不要使用通用名,比如 "me","this" 或 "self",典型的面向对象语言标识符,赋予方法特殊的含义。在 Go 中,方法的 receiver 仅仅是另一个参数,因此应该相应的命名。名称不需要像方法参数那样具有描述性,因为它的作用是显而易见的,没有记录的目的。它可以非常短,因为它几乎出现在该类型的每个方法的每一行中,familiarity admits brevity。使用上也要一致:如果你在一个方法中把 receiver 称为 "c",不要在另一个方法中称它为 "cl"。

Receiver Type

在方法上选择使用值还是指针 receiver 可能比较困难,特别是对新手来说。如果有疑问,可以使用指针,但有时候,值 receiver 也是有意义的,通常是为了效率,例如对于小型不变的结构或基本类型的值。一些有用的指南:

  • 如果 receiver 是 map、func 或 chan,不要使用它们的指针。如果 receiver 是 slice 并且方法不对其进行 reslice 或 reallocate,不要使用它的指针。
  • 如果方法需要改变 receiver,receiver 必须用指针。
  • 如果 receiver 是一个包含 sync.Mutex 或类似的同步字段的结构体,receiver 必须使用指针来避免拷贝。
  • 如果 receiver 是一个大的结构体或数组,指针 receiver 效率更高。多大才算大?假设要将其所有元素作为参数传递给方法,如果这样感觉太大的话,那它对于 receiver 来说也太大了。
  • 函数或方法(并发执行或从该方法调用时)是否会使 receiver 发生变化?当该方法被调用时,值类型的 receiver 会创建一个 receiver 的副本,因此外部更新将不会应用到此 receiver。如果更改必须在原始 receiver 中可见,那该 receiver 必须是一个指针。
  • 如果 receiver 是一个 struct、数组、或切片,并且它的任何元素都是指向可能发生改变的对象的指针,最好使用指针类型的 receiver,因为它将使读者更清楚的了解意图。
  • 如果 receiver 是一个自然是值类型的小数组或 struct (例如,类似 time.Time 类型的对象),没有可变字段和指针,或者只是一个简单的基本类型,如 int 或 string,用值类型的 receiver 也可以。值 receiver 可以减少可生成的垃圾数量;如果一个值被传递给一个值 receiver 的方法,将使用栈上的副本而不是在堆上分配(编译器试图聪明的避免这种分配,但不是总能成功)。因此,在没有分析之前,不要因为此原因而选择值类型 receiver。
  • 最后,当有疑问时,使用指针 receiver。

Synchronous Functions

与异步函数相比,更喜欢同步函数(同步函数指直接返回其结果或在返回前完成任何回调或 channel 操作的函数)。

同步函数将 goroutines 本地化在一个调用中,更容易推断它们的寿命,以及避免泄漏和数据竞争。它们也更容易测试:调用者可以传递一个输入并检查输出,而不需要轮询或同步。

如果调用者需要,他们可以通过在单独的 goroutine 中调用此函数来轻松地提升并发性。但在调用端移除不需要的并发性非常困难,有时候是不可能的。

Useful Test Failures

测试失败时,应当提供有用的信息,来说明哪里出错了,输入什么,得到什么,期望什么。写一堆 assertFoo helpers 可能很诱人,但要确保 helpers 生成有用的错误消息。假设调试失败测试的人不是你,也不是你团队中的人。一个典型的 Go 测试失败如下:

if got != tt.want {
	t.Errorf("Foo(%q) = %d; want %d", tt.in, got, tt.want) // or Fatalf, if test can't test anything more past this point
}

注意这里的顺序是 actual != expected,消息也使用这个顺序。一些测试框架鼓励逆向编写这些内容:0 != x,"expected 0, got x", 等等。Go 不鼓励这样。

如果这看起来要大量的打字,你可以写一个表驱动测试

当使用具有不同输入的测试 helper 时,另一个消除失败测试歧义的通用技巧是用不同的 TestFoo 函数来封装每个调用者,因此测试失败具有以下名称:

func TestSingleValue(t *testing.T) { testHelper(t, []int{80}) }
func TestNoValues(t *testing.T)    { testHelper(t, []int{}) }

在任何情况下,您都有责任失败,并向将来进行代码调试的人提供有用的信息。

Variable Names

Go 中的变量名应该短而不是长。对于范围有限的局部变量尤其如此。首选 c 而不是 lineCount。首选 i 而不是 sliceIndex

基本原则:距使用名称的声明越远,该名称就必须越具有描述性。对于一个方法的 receiver 来说,一两个字母就足够了。常见的变量,如循环索引和 readers,可以用单个字母(i, r)。更不寻常的对象和全局变量需要更多的描述性名称。

Go 中关于方法的 receiver 的总结

关于这部分内容,在写代码时一直都是用指针类型的 receiver,但没有系统整理过规则,这里进行总结。

首先是官方 FAQ 中说的那三条:

  • 第一条也是最重要的一条,方法是否要修改 receiver?
  • 其次是效率的考虑,如果 receiver 非常大,比如说一个大 struct,使用指针将非常合适。
  • 接下来是一致性,如果该类型的某些方法必须使用指针 receiver,剩下的也要使用指针。不论使用什么类型的 receiver,方法集要一致。

还有一些其它的规则:

  • 实例和实例指针可以调用值类型和指针类型 receiver 的方法。
  • 如果通过 method express 方式,struct 值只能调用值类型 receiver 的方法,而 struct 指针是能调用值类型和指针类型 receiver 的方法的。
  • 如果 receiver 是 mapfuncchan,不要使用指针。
  • 如果 receiver 是 slice,并且方法不会重新分配 slice,不要使用指针。
  • 如果 receiver 是包含 sync.Mutex 或其它类似的同步字段的结构体,receiver 必须是指针,以避免复制。
  • 如果 receiver 是大 structarray,receiver 用指针效率会更高。那么,多大是大?假设要把它的所有元素作为参数传递给方法,如果这样会感觉太大,那对 receiver 来说也就太大了。
  • 如果 receiver 是 structarrayslice,并且它的任何元素都是可能发生改变的内容的指针,最好使用指针类型的 receiver,这会使代码可读性更高。
  • 如果 receiver 是一个本来就是值类型的小 arraystruct,没有可变字段,没有指针,或只是一个简单的基础类型,如 intstring,使用值类型的 receiver 更合适。
  • 值类型的 receiver 可以减少可以生成的垃圾量,如果将值传递给值方法,可以使用栈上的副本而不是在堆上进行分配。编译器会尝试避免这种分配,但不会总成功。不要为此原因却不事先分析而选择值类型的 receiver。
  • 最后,如有疑问,请使用指针类型的 receiver。

下面看两个比较容易搞混的例子:

package main

import (
	"fmt"
)

type Ball struct {
	Name string
}

func (b *Ball) Ping() {
	fmt.Println("ping")
}

func (b Ball) Pong() {
	fmt.Println("pong")
}

func main() {
	v := Ball{}
	p := &Ball{}

	v.Ping()
	v.Pong()

	p.Ping()
	p.Pong()
}

运行结果是都可以正常执行:

❯ go run test.go
ping
pong
ping
pong

也就是说,struct 的实例和实例指针都可以调用值类型和指针类型 receiver 的方法。

再看这段代码,这里是通过 method expression 的方式调用方法:

package main

import (
	"fmt"
)

type Ball struct {
	Name string
}

func (b *Ball) Ping() {
	fmt.Println("ping")
}

func (b Ball) Pong() {
	fmt.Println("pong")
}

func main() {
	v := Ball{}
	
	Ball.Ping(&v)
	Ball.Pong(v)
}

这次的执行结果呢?

❯ go run test.go
# command-line-arguments
./t.go:23:6: invalid method expression Ball.Ping (needs pointer receiver: (*Ball).Ping)
./t.go:23:6: Ball.Ping undefined (type Ball has no method Ping)

可以看到,通过 method expression 的方式,struct 值只能调用值类型 receiver 的方法。

再看 struct 指针调用方法:

func main() {
	p := &Ball{}

	(*Ball).Ping(p)
	(*Ball).Pong(p)
}

执行结果:

❯ go run test.go
ping
pong

即 struct 指针是能调用值类型和指针类型 receiver 的方法的。

但在写代码时,不建议使用 method expression 这种方式来调用方法。不过应该也没有人会用这种方式的...吧?

go mod vendor 使用问题记录

问题

在项目中使用 go mod vendor 将依赖库拷贝到了 vendor 目录中,用着用着出现问题了:在想使用一个包的某个函数时,IDE 的提示死活出不来。刚开始很纳闷,难道是 IDE 抽风了?不过又一想,没有玄学,肯定是哪里出问题了。

去 vendor 目录对应 path 下找该函数,翻遍了几个文件都没找到。又去 GitHub 仓库我使用的版本里面找了找,欸?这里的文件怎么比我 vendor 目录下面的多好多?

奇妙

在尝试各种 go mod 操作之后,vendor 目录中依赖库的文件还是不完整。Google 了一下,发现这个 Issue,有人专门写了个 工具 解决这个问题。

。。。

go mod vendor 只把项目中 import 的部分拷贝到了 vendor 目录,要是不使用第三方工具的话,合着我只能在写完全部项目后才能执行 go vendor?

SSH 免密登录

目标:通过 SSH 从 Server A 免密登录到 Server B。为了表述方便清晰,下面将 Server A 称为本地主机,将 Server B 称为远程主机。

方式:Public Key 认证。

原理:

  • 在本地主机上通过 ssh-keygen 生成一对密钥,简单说就是公钥用来加密,私钥用来解密(可扩展学习非对称加密)。
  • 将公钥拷贝到远程主机,然后在本地主机进行 SSH 连接。
  • 远程主机上的 sshd 会产生一个随机数并用上面的公钥进行加密后发给本地主机,本地主机会用私钥进行解密并把这个随机数发回给远程主机。
  • 远程主机的 sshd 会认为本地主机拥有与该公钥匹配的私钥,允许登录。

步骤:

主机系统为 CentOS 7。

  • 远程主机的 sshd 服务相关操作:

    systemctl status sshd
    systemctl start sshd
    systemctl stop sshd
    systemctl reload sshd

    需要在配置中启用公钥登录,vi /etc/ssh/sshd_config

    # 配置文件
    PubkeyAuthentication yes
    #PasswordAuthentication yes
  • 在本地主机生成一对密钥:

    ssh-keygen
  • 将公钥拷贝到远程主机,重命名 authorized_keys:

    scp id_rsa.pub root@ServerBIP:~/.ssh
    mv ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys

    或通过 ssh-copy-id 命令自动完成:

    ssh-copy-id -i ~/.ssh/id_rsa.pub root@ServerBIP
  • 本地主机向远程主机发送一个连接请求,信息包括用户名、IP:

    ssh root@serverBIP
  • 远程主机收到请求,会从 authorized_keys 中查找是否有相同的用户名、IP,如果有,它会随机生成一个字符串,并用公钥加密,然后发送给本地主机。

  • 本地主机收到远程主机发来的信息后,会使用私钥进行解密,然后将解密后的字符串发送给远程主机。

  • 远程主机收到本地主机发来的信息后,会和之前生成的字符串进行比对,如果一致,则允许免密登录。

使用:

  • 查看远程主机 IP 地址:

    ip addr
  • 登录:

    ssh root@ServerBIP

注意:

  • 将远程主机中 ~/.ssh 的权限设置为 700。
  • /.ssh/authorized_keys 的权限设置为 600。

singleflight 源码阅读

缓存击穿

在缓存系统中,当某个热点数据的缓存过期时,如果瞬间有大量请求到达 DB,可能会导致 DB 出现问题。

怎么解决?可以让先到达的请求将最新数据更新到缓存,其它请求再使用缓存数据即可。

Go 中的 singleflight 包就可以用于这个场景。它可以只让其中一个请求得到执行,其余请求会阻塞到该请求返回执行结果并使用该结果,从而达到防止击穿的效果。

singleflight 源码

singleflight.go 的源码比较简单:

// 表示一组请求
type call struct {
  // 用来阻塞其余请求
	wg sync.WaitGroup

	// 被调用的函数返回的结果和 err 赋值给这两个字段
	val interface{}
	err error

	// forgotten indicates whether Forget was called with this call's key while the call was still in flight.
	forgotten bool

	// These fields are read and written with the singleflight
	// mutex held before the WaitGroup is done, and are read but
	// not written after the WaitGroup is done.
	dups  int
	chans []chan<- Result
}

// 用来存储不同 key 的请求组
type Group struct {
	mu sync.Mutex       // protects m
	m  map[string]*call // lazily initialized
}

// Result holds the results of Do, so they can be passed
// on a channel.
type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

// 执行指定函数并返回结果
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
  // 延迟初始化
	if g.m == nil {
		g.m = make(map[string]*call)
	}
  // 如果当前请求之前已经有对该 key 的请求, 则阻塞当前请求
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock() // 释放锁
		c.wg.Wait() // wg 的这种用法妙啊

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
    // 使用前面请求返回的结果
		return c.val, c.err, true
	}
  
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c // 用 key 来标识一组同样的请求
	g.mu.Unlock()

  // 调用 fn 并返回结果
	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

// 与 Do 方法效果一样,不过这里不阻塞当前请求,而是直接返回一个 channel 用于接收结果
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1) // 当前请求从这个 ch 接收结果
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}

	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

  // 异步调用 fn
	go g.doCall(c, key, fn)

	return ch
}

// doCall handles the single call for a key.
// 对 fn 执行调用, 并将结果
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false

	// use double-defer to distinguish panic from runtime.Goexit,
	// more details see https://golang.org/cl/134395
  // 这个 defer 会后执行, 负责判断
	defer func() {
		// the given function invoked runtime.Goexit
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		c.wg.Done()
		g.mu.Lock()
		defer g.mu.Unlock()
		if !c.forgotten { // 返回之前删除这个 key, 标识着本次对 key 的调用完成
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			// In order to prevent the waiting channels from being blocked forever,
			// needs to ensure that this panic cannot be recovered.
			if len(c.chans) > 0 {
				go panic(e)
				select {} // Keep this goroutine around so that it will appear in the crash dump.
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
			// Already in the process of goexit, no need to call again
		} else {
      // doCall 返回之前将调用结果写入到所有其余等待结果的请求的 channel 中
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

  // 执行 fn 的部分, 而且是在一个匿名函数里面执行.
  // 这里又用到了一个 defer(这个 defer 会先执行), 用来区分 panic 和 runtime.Goexit.
	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r) // 如果 panic 了, 修改 c.err 的值
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

// Forget tells the singleflight to forget about a key.  Future calls
// to Do for this key will call the function rather than waiting for
// an earlier call to complete.
func (g *Group) Forget(key string) {
	g.mu.Lock()
	if c, ok := g.m[key]; ok {
		c.forgotten = true
	}
	delete(g.m, key)
	g.mu.Unlock()
}

代码虽然不多,里面一些巧妙的用法可以学习。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.