背景

自从接触 golang 语言后,发现编程是简化了不少,不用把大量时间消耗在如何正确编写代码,而专注于代码的实现。

心得

优势

  1. 开发时顺着设计思路,实现不会跑偏。
  2. 并发逻辑高效,随时开协程。

劣势:

复用比较费劲,复杂的设计模式实现起来不容易。

golang 没有继承的概念,只有组合,多态支持的不完整。通过组合来实现方法的覆盖,这里有个严重的问题,就是覆盖的方法 A 中,如果调用了内嵌结构体的方法 B,该方法 B 运行就在内嵌结构体,如果在方法 B 调用覆盖的其他方法是无效的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Base struct{}

func (b *Base) MethodA() {
fmt.Println("Base MethodA")
b.MethodB() // 调用 Base 的 MethodB
}

func (b *Base) MethodB() {
fmt.Println("Base MethodB")
}

type Derived struct {
Base
}

func (d *Derived) MethodB() {
fmt.Println("Derived MethodB")
}

func main() {
d := Derived{}
d.MethodA() // 输出: Base MethodA, Base MethodB
}

如果你在覆盖的方法中调用了内嵌结构体的方法,可能会导致不符合预期的行为。因为内嵌结构体的方法调用不会自动使用外层结构体中覆盖的方法,而是使用内嵌结构体原有的方法。

为了避免这种情况:

  1. 避免在覆盖的方法中调用内嵌结构体的方法:如果可能,尽量在覆盖的方法中直接实现所需的逻辑,而不是依赖内嵌结构体的方法。
  2. 使用接口:通过接口来定义行为,并在外层结构体中实现这些接口。这可以提供更好的多态性和灵活性。
  3. 重新考虑设计,看看是否有更好的方式来组织代码。
  4. 单元测试尽量覆盖一下,避免出现预期外的情况。

序言

今天为了跑一个古老的C程序,安装了 libwebsockets 。本以为 brew install 是一件很简单的事情,没想到耗费了一个上午的时间,特此记录一下。

初步探索

brew install libwebsockets 运行后,发现安装的是 4.3.2 的版本,本来不关心,但是程序编译时报错,看报错信息时结构体成员找不到。很明显,安装的版本过高导致的,遂找到 4.0.21 的版本想安装。
brew search libwebsockets 发现并没有别的版本可供选择。这会,有个选择时下载 libwebsockets 源码,自己编译安装,但是并不想走这条路。哪里有问题就从哪里解决。
直接找到 homebrew 里 libwebsockets.rb,找到其历史版本,运行 brew install https://raw.githubusercontent.com/Homebrew/homebrew-core/fd1f75f399384faa19b4951a404f716507b0808c/Formula/libwebsockets.rb,但是 brew 报错不支持 url 这种方式,提示使用 extract。
又研究了一下 extract,不得要领。去网上搜到这几个真言,如获至宝,自信满满的开始尝试。

1
2
3
4
5
6
# 1. create a new tap
brew tap-new $USER/local-<packageName>
# 2. extract into local tap
brew extract --version=1.2.3 <packageName> $USER/local-<packageName>
# 3. run brew install@version as usual
brew install <packageName>@1.2.3
  1. 执行 brew tap-new xin.ye/local-https://raw.githubusercontent.com/Homebrew/homebrew-core/fd1f75f399384faa19b4951a404f716507b0808c/Formula/libwebsockets.rb,报错,尝试几次发现 packageName 是指 libwebsockets,不指当前计划安装的地址。执行 brew tap-new xy/local-libwebsockets 成功
  2. 执行 brew extract --version=4.0.21 https://raw.githubusercontent.com/Homebrew/homebrew-core/fd1f75f399384faa19b4951a404f716507b0808c/Formula/libwebsockets.rb xy/local-libwebsockets 出现错误,报错信息里还把填的地址都转成小写,让我一度以为是我地址有问题,找了一圈。最后幡然醒悟,它打印的日志 Searching repository history 是不是代表回去历史记录里查,不需要我输入具体地址。执行 brew extract --version=4.0.21 libwebsockets xy/local-libwebsockets 成功。
  3. 执行 brew install [email protected] 成功。

结论

brew extract 会自动去查找 gtihub 安装模块的历史版本,所以其实你不需要关心历史版本的 url 地址,只要知道版本号即可。
一开始被历史版本的 url 地址误导的有点深。特此记录一下。

抓包

一般都有 tcpdump/ngrep,tcpdump 可读性比较差,一般只有在怀疑 tcp 连接出现问题,会用 tcpdump 去查看 tcp 的连接状态。ngrep 对比之下,使用比较方便,基本能满足一般需求。

ngrep tcpdump
过滤规则 支持 host\port\net\proc 支持 host\port\net\proc
易读性 较高(http 协议等很清晰) 较低
过滤字符串 支持 不支持

ngrep

ngrep 方便的数据包匹配和显示工具,可以过滤网络数据包并搜索字符串,是grep(在文本中搜索字符串的工具)的网络版。

输出内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
interface: any
filter: ( port 8083 ) and (ip || ip6)
match: /withdra(.+)?/wechat
#
T 127.0.0.1:4108 -> 127.0.0.1:8083 [AP] #1
POST /station/withdraw/wechat HTTP/1.0.
Host: internal-bytepower-station-private-alb-1244500456.cn-northwest-1.elb.amazonaws.com.cn.
X-Real-IP: 172.31.73.171.
X-Forwarded-For: 172.31.48.224, 172.31.73.171.
X-Forwarded-Proto: http.
Content-Length: 411.
X-Forwarded-Port: 80.
X-Amzn-Trace-Id: Root=1-61b7089b-56e72bb92639c4ff7962f8c6.
User-Agent: Go-http-client/1.1.
Content-Type: application/json.
Accept-Encoding: gzip.
.
{"app_id":"APPERXW6VEMRTM5P","app_metric_name":"010.RunFast03_K","cash_num":0.3,"cuid":"e419e36a533c6545","desc":".................................","timezone":28800,"user_id":"UUCG5AOFGFABJ","wechat_app_id":"wx8efbae40ad0b8de5","wechat_merchant_id_list":["1605715141","1560154181"],"wechat_open_id":"o3HRV647ucInh5T1mVtn8gRKNwks","wechat_union_id":"oydXy0x1bft35XV-qwyH-0qnb3g4","withdraw_id":"WCHHX5Y2R7GYIA"}
#####
T 127.0.0.1:8083 -> 127.0.0.1:4108 [AP] #6
HTTP/1.0 200 OK.
Content-Length: 210.
Date: Mon, 13 Dec 2021 08:47:23 GMT.
Content-Type: application/json; charset=utf-8.
.
{"withdraw":{"cash_num":0.3,"reason":"","status":"WITHDRAW_SUCCESS","wechat_merchant_id":"1605715141","wechat_err_code":"","withdraw_id":"WCHHX5Y2R7GYIA","withdraw_time":"2021-12-13T16:47:23.647956966+08:00"}}
########
  • 第一行是网卡
  • 第二行是过滤的规则
  • 第三行#,未匹配的数据包都以“#”显示
  • 第四行监听到的 http 请求协议,请求 url,请求头和请求的参数内容。
  • 第9行未匹配的数据包都以“#”显示。
  • 第19行是 http 返回值

参数说明

  • 过滤指定网卡的数据包
    -d 要查看所有网卡,可以 -d any
  • 查看内容
    -W: 一般为了可读性使用 byline。有4种格式 normal, byline, single, none。
    -n 仅捕获指定数目的数据包进行查看
    -A 匹配到数据包后dump随后的指定数目的数据包
    -T 显示上一个匹配的数据包之间的时间间隔
  • 二进制查看
    -xX 左边会显示16进制格式,右边显示可读字符
  • 输出或读取文件
    -O 将匹配的数据保存到.dump文件
    -I 从.dump文件中读取数据进行匹配
  • 其它
    -q 静默模式,如果没有此开关,未匹配的数据包都以“#”显示
    -t 显示匹配到数据包的时间

语法

1
2
3
ngrep <-LhNXViwqpevxlDtTRM><-IO pcap_dump><-n num><-d dev><-A num>
<-s snaplen><-S limitlen><-w normal|byline|single|none><-c cols>
<-P char><-F file><match expression><bpf filter>

简化
ngrep -W byline -d any '过滤字符串,支持表达式' '过滤规则'
在实际使用中,没有过滤字符串最好也加上'',因为过滤规则可能被这个影响。所以如果发现输出都是 ##### 且不符合预期,最好先检查一下自己的命令,过滤字符串和过滤规则格式是否正确。

过滤规则

  • 基于地址过滤:host

    1
    sudo ngrep -W byline -d any '' 'host 127.0.0.1'

    数据包的 ip 可以再细分为源ip和目标ip两种

    1
    2
    3
    4
    5
    # 根据源ip进行过滤
    sudo ngrep -W byline -d any '' 'src host 127.0.0.1'

    # 根据目标ip进行过滤
    sudo ngrep -W byline -d any '' 'dst host 127.0.0.1'

    支持域名

  • 基于端口过滤:port
    使用 port 就可以指定特定端口进行过滤

    1
    sudo ngrep -W byline -d any '' 'port 8083'

    端口同样可以再细分为源端口,目标端口

    1
    2
    3
    4
    5
    # 根据源端口进行过滤
    sudo ngrep -W byline -d any '' 'src port 8083'

    # 根据目标端口进行过滤
    sudo ngrep -W byline -d any '' 'dst port 8083'

    也可以指定一个端口段

    1
    sudo ngrep -W byline -d any '' 'portrange 80-8082'
  • 其它
    能识别TCP、UDP和ICMP包,使用 tcptdpicmp
    tcp 里面 ip 和 ip6 区分,使用ipip6
    支持网段过滤,使用示例 net 172.17.0.0/24,支持源和目标网段。

  • 过滤规则组合
    支持 and、or、not。

    1
    2
    3
    and:所有的条件都需要满足,也可以表示为 &&
    or:只要有一个条件满足就可以,也可以表示为 ||
    not:取反,也可以使用 !

    也有可能需要用到括号()

场景

基础

  • 指定 port
    查看请求,监听相应端口即可,以 station 举例,端口是 8083,示例如下:
    sudo ngrep -W byline -d any '' 'port 8083'

sudo ngrep -W byline -d any port 8083 这样写有没有问题?sudo ngrep -W byline -d any src port 8083 这样写会不会有问题?

PostgreSQL 默认端口是 5432
sudo ngrep -W byline -d any '' 'port 5432'

Redis 默认端口是 6379
sudo ngrep -W byline -d any '' port 6379'

  • 指定 host
    需要过滤对应的 host 和 port,比如过滤出某个数据库的请求。可以使用域名
    sudo ngrep -W byline -d any '' 'host 172.17.20.138'
    sudo ngrep -W byline -d any '' 'host bytepower-station-db.cluster-cv0oenksk2p5.rds.cn-northwest-1.amazonaws.com.cn'

  • 正则过滤
    过滤请求的 url
    sudo ngrep -W byline -d any '/withdra(.+)?/wechat' 'port 8083'

  • 查看二进制
    sudo ngrep -xX -W byline -d any '/withdra(.+)?/wechat' 'port 8083'

  • udp
    sudo ngrep -d any '' 'udp'

进阶

  • 分析
    排查问题的时候,很多时候我们需要对一份数据反复分析查看。
    这种情况下:

    • 先收集导出到文件,这个时候过滤条件可以比较少甚至不设置。
      sudo ngrep -W byline -O /tmp/check_1.dump -d any -t
    • 退出后,针对这个文件进行搜索分析。
      sudo ngrep -W byline -I /tmp/check_1.dump '^GET' 'port 8083'
      sudo ngrep -W byline -I /tmp/check_1.dump 'app_id=APPIFP5WATVLJAZ3' 'port 8083'
      如何看到这个请求后面的一系列操作,可以使用 -A 来过滤出匹配到数据包后面的数据包
      sudo ngrep -W byline -I /tmp/check_1.dump -A 10 'app_id=APPIFP5WATVLJAZ3' 'port 8083'
      通过做这种途径可以看到后续的 sql 和 redis 相关操作。还是难以分析?可以循环上面的操作,过滤导出文件在分析
      sudo ngrep -W byline -I /tmp/check_1.dump -O /tmp/check_2.dump -A 10 'user_id=UUCGNNN7KAXGQ'
      这是一种分析的思路。当然你要是正则够厉害,可以一个复杂的正则搞定。
  • 实际使用
    在测试部署到机器上后,通过 ngrep 来查看请求发起了多少次数据库查询、redis 操作。这在和产品组联调的时候很有用。
    sudo ngrep -W byline '' 'not port 22'
    sudo ngrep -d any '' 'portrange 5000-8082'
    线上的时候,因为每秒数百上千个请求,这个时候就结合

组合使用

可以对结果使用 awk 或者 grep 过滤
sudo ngrep -W byline -d any '' 'port 8083' |grep 10 '/withdraw'

tcpdump

tcpdump是一个对网络上的数据包进行截获的包分析工具。它允许用户截获和显示发送或收到通过网络连接到该计算机的TCP/IP和其他数据包。

输出内容

1
08:31:54.674766 IP ip-172-30-1-113.cn-northwest-1.compute.internal.http > ip-172-31-48-163.cn-northwest-1.compute.internal.64372: Flags [P.], seq 619:14807, ack 1313, win 285, options [nop,nop,TS val 3247021864 ecr 577459112], length 14188: HTTP: HTTP/1.1 200 OK
  • 第一列:时分秒毫秒 21:26:49.013621
  • 第二列:网络协议 IP
  • 第三列:发送方的地址+端口号,其中172.20.20.1是 ip,而15605 是端口号
  • 第四列:箭头 >, 表示数据流向
  • 第五列:接收方的ip地址+端口号,其中 172.20.20.2 是 ip,而5920 是端口号
  • 第六列:冒号
  • 第七列:数据包内容,包括Flags 标识符,seq 号,ack 号,win 窗口,数据长度 length,其中 [P.] 表示 PUSH 标志位为 1,更多标识符见下面

参数说明

  • 控制详细内容的输出
    一般 -v 就足够使用了
    -v 产生详细的输出. 比如包的TTL,id标识,数据包长度,以及IP包的一些选项
    -vv 产生比-v更详细的输出。比如NFS回应包中的附加域
    -vvv 产生比-vv更详细的输出。比如 telent 时所使用的SB
  • 过滤指定网卡的数据包
    -i 要查看所有网卡,可以 -i any
  • 过滤特定流向的数据包
    -Q: 选择是入方向还是出方向的数据,可选项有:in, out, inout
  • 输出或读取文件
    -w 后接一个以 .pcap 后缀命令的文件名
    -r 从 .pcap 文件读取
  • 其它
    -n 不把ip转化成域名,直接显示 ip
    -nn 不把协议和端口号转化成名字
    -s number,number 是 0 的话,表示截取报文全部内容
    -l 基于行的输出,便于你保存查看,或者交给其它工具分析

场景

  • 查看请求
    我们一般使用 nginx 中转,因为 nginx 转到 127.0.0.1,需要指定 interface
    sudo tcpdump port 8083 -i lo

  • 查看完整的请求
    默认的话 tcpdump 只显示部分数据包,默认68字节。
    sudo tcpdump -s 0 -v -n -l

  • 图形化查看
    通过ssh传到本地 Wireshark ,查看内容
    ssh bytepower-appserver-tasklet 'sudo tcpdump -s 0 -c 1000 -nn -w - not port 22' | /Applications/Wireshark.app/Contents/MacOS/Wireshark -k -i -

附录

ngrep 使用官方文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
ngrep: invalid option -- '-'
usage: ngrep <-hNXViwqpevxlDtTRM> <-IO pcap_dump> <-n num> <-d dev> <-A num>
<-s snaplen> <-S limitlen> <-W normal|byline|single|none> <-c cols>
<-P char> <-F file> <-K count>
<match expression> <bpf filter>
-h is help/usage
-V is version information
-q is be quiet (don't print packet reception hash marks)
-e is show empty packets
-i is ignore case
-v is invert match
-R is don't do privilege revocation logic
-x is print in alternate hexdump format
-X is interpret match expression as hexadecimal
-w is word-regex (expression must match as a word)
-p is don't go into promiscuous mode
-l is make stdout line buffered
-D is replay pcap_dumps with their recorded time intervals
-t is print timestamp every time a packet is matched
-T is print delta timestamp every time a packet is matched
specify twice for delta from first match
-M is don't do multi-line match (do single-line match instead)
-I is read packet stream from pcap format file pcap_dump
-O is dump matched packets in pcap format to pcap_dump
-n is look at only num packets
-A is dump num packets after a match
-s is set the bpf caplen
-S is set the limitlen on matched packets
-W is set the dump format (normal, byline, single, none)
-c is force the column width to the specified size
-P is set the non-printable display char to what is specified
-F is read the bpf filter from the specified file
-N is show sub protocol number
-d is use specified device instead of the pcap default
-K is send N packets to kill observed connections

引言

time.Sleep(1*time.Second) 这个方法在某些场景很有用,在使用它的时候不由想到。它内部实现原理是什么?如果只是想让出 cpu 时间是不是有更好的方法?执行sleep 后最短回来的时间是多少?

探索分析

首先,我们打开 time.Sleep 源码,出乎意料,一个空的方法,瞬间我迷茫了,难道 go 还有这种使用姿势,定义一个空的方法,然后延迟实现?找了半天不得其法,查完资料才发现在 runtime/time.go 里面有个 timeSleep 方法,有完整的实现。 看注释 //go:linkname timeSleep time.Sleep ,这是一种全新的体验啊,赶紧学习了一下使用姿势。

1
2
3
4
5
6
7
`//go:` 是编译指令,编译器接受注释形式的指令。
实测下来挺坑的,要在具体实现地方引入_ "unsafe” 且在方法添加注释`//go:linkname tSleep swcontent/test/go/sleep/outer.TSleep` ,在声明方法包引入具体实现的包名 import _ "swcontent/test/go/sleep/interval” ,在编译的时候还要用参数忽略错误,简单做法是在声明方法包添加 i.s 空文件。实际使用的价值没那么大,主要知道这个可以辅助我们阅读 go 源码。
还有其他的比如
`//go:nosplit` 跳过栈溢出检查,stack 一开始是2k,会动态增长。
`//go:noinline` 禁止内联函数。
`//go:noescape` 禁止逃逸,即该函数用过就销毁,不会逃逸到堆上。
`//go:norace` 跳过竞争检测,`go run -race main.go` 可以检测

time.Sleep 调用流程是 timeSleep→gopark→resetForSleep→resettimer→modtimer→wakeNetPoller。timeSleep 会创建 timer,并把 goroutineReady 设为其方法。gopark 是很关键的方法,里面主要做了几件事:

  1. 获取当前 g 的 m,设置 waitlock 相关参数。
  2. 调用 mcall 来切换协程,会保存当前 g 的 PC/SP ,在 m->g0 堆栈环境里执行 park_m 方法来切换当前 g 的状态并解绑 m,运行 waitlock 的参数即会调用 resetForSleep (里面会将 timer 添加到定时器中)然后清除 ,然后由 schedule 方法来决定下一个 g 调度。

time.Sleep 曾经有个 bug,在 go 1.4 引入抢占调度后,如果在 timer.status 变更成 timerModifying 后,被调度切换走,那么在 gopreempt_m→goschedImpl→schedule→checkTimers->runtimer 里面会无限等待下去。这块在之后加了防止抢占来修复,从防止抢占方法看实际上是个类似读锁的概念。

结论

  1. go 源码很多切换协程和涉及系统调用的方法都是用汇编写的。
  2. go time.Sleep 原理简单来说就是创建 timer 然后让出线程,最后靠定时系统协程来唤醒。
  3. 如果 timeSleep 很短,理论上在 gopark 时就已经过去了,那么它会执行 wakeup。
  4. 可以用 runtime.Gosched() 来让出 cpu。

Linux

IO 中分为 阻塞IO、非阻塞IO、多路IO、信号驱动IO、异步IO。其中阻塞IO、非阻塞IO、多路复用IO、信号驱动IO都属于同步IO。
同步IO都是等待响应,异步IO是去做别的。
模式名称很明显,其中非阻塞IO只是代表不断去查有没有返回值,不代表交出 CPU。

多路复用IO

有 epoll、poll、select 实现。
select 对所有流无差别轮询,有最大限制。
poll 对所有流轮询,没有最大限制。
epoll 会在通过事件 fd 来通知完成。

信号驱动IO

socket 连接,upd 发送通知信号。

异步IO

不需要再去读,等通知到了后直接可以使用。一般线程池实现。

binder 代码

core/java/android/os/下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Binder.java  remote object
IBinder.java remote object interface
ServiceManager.java final class. core class


ServiceManagerNative.java contain ServiceManagerProxy
@UnsupportedAppUsage
public static IServiceManager asInterface(IBinder obj) {
if (obj == null) {
return null;
}
// ServiceManager is never local
return new ServiceManagerProxy(obj);
}

Binder 原理

Binder 原理说白了很简单,就是通过 mmap 来进行内存映射。
进程通信一般几种途径。通过 socket,通过磁盘文件,通过共享内存,管道 pipe。
传统的拷贝是用户空间到内核空间,然后再从内核空间到别的用户空间,管道要4次拷贝,而共享内存要2次拷贝。socket 的拷贝一般来说最少也要4次(简化了网卡)。很明显,最好的是共享内存方式进行进程通信。
而在 android,因为进程通信的频繁。因为 android 的特性,activity 启动,service 启动等等都和跨进程通信息息相关,不过它们通信的是系统进程,且 android 内部多进程配置很简单,应用内使用的也不少。所以 android 思考在这块是否可以在进一步优化。
Binder 的诞生就基于此。Binder 通过 mmap 来映射。不过它是在内存空间的映射,它主要是发送进程内核空间缓冲区,然后另外接收进程内核缓存区映射发送进程。接收进程用户空间在映射自己内核空间的。所以唯一的一次拷贝,是发送进程从自己用户空间拷贝到内核空间。

Binder 设计

跨进程面临问题:

  1. 如何发现对方
  2. 通过什么方式来约定通信协议或者说内容
    其实这是不是和 http 请求一样。你知道网址,但是如何找到对方呢,以及中间的协议内容如何规定。
    在 http 中,是通过域名服务器来帮助你找到地址,就算是 ip,你也要在通过之前通过注册来让别人能在互联网大海里面找到你。总结来说需要一个权威的地方来帮助定位。
    而问题2则是 http 协议规定了通信协议。然后做为接收方,要对外提供端口、api,不提供的话,明显外界只能靠猜测。

我的设计

我之前也设计了一套跨进程的方式,目的是为了解决多进程环境下配置一致性的问题。
一开始我想的是用广播,由 app 来通过配置决定哪个进程来处理配置升级等等问题,然后其他进程启动时候,动态注册接收配置更新的广播,发送广播表示它起来了,然后配置进程接收到后,就存储列表表示当前在线的进程。当配置更新的时候,由配置进程发送广播,各个进程收到后,发送已接收的广播。配置进程接收到全部后则发布广播各个进程各自从文件读取,更新配置。如果配置进程没收到全部的在线进程的广播,则它等待广播超时后就会在进程列表里删除未达到的进程,在发布重新上线的广播,以便来更新进程列表,在重复上述过程。(ps:配置进程是静态注册的广播,以便别的进程上线能拉起配置进程)
但这套机制因为广播太重了(因为广播实际上就是在 AMS 上注册,然后 AMS 接收后按注册列表分发),会加大 anr 概率等原因,这套方案被 ps 了。
既然广播太重了,那我觉得换一种方式,不在使用广播,而使用 binder 来取代广播。它的优势是轻量级的,省去了系统 AMS 工作压力。而且 binder 有一个特性,就是可以注册监听,一旦它死亡,就会回调,来方便配置进程及时删除死亡进程(它的原理没细看,大概是进程注册到监听进程,然后监听进程在死亡时候会在 release时发送通知)。
这么一来就比较完美了,每个进程把自己的 binder 交给配置进程。配置进程在配置更新的时候来遍历列表刷新配置。

android 内部设计

binder 设计基础分为 client、server、driver。通信协议是 server 规定的,client 首先得拿到 binder proxy 对象,然后按协议规定传输数据,数据经由 binder driver 直接复制到 server 的内核空间,因为 server 内核空间已经和自己的用户空间形成映射,所以整个过程只需要拷贝一次即可。即从 client 用户空间拷贝到内核空间。而协议的实现方式,采用了可以由 java 层来定义接口,driver 负责本地 binder 和远程 binder 的传递转化,从而将细节完全隐藏。
在 java 层看来,AIDL 方式显得很奇妙。定义一个 AIDL,然后生成接口文件,只要创建了 Stub 的 Binder 类型对象,就可以拿着这个 Binder 跨进程传递,到处浪了。其他进程获得这个 Binder 就可以调用该进程的方法,其他进程获取到的实际已经是 BinderProxy 对象,C++ 层是 BpBinder 对象,最后实现通过 IPCThreadState 来发送数据并等待返回。
为了可以全局使用,设计了 ServiceManager。ServiceManager 承担类似 dns 的作用,它是在最开始启动的,为其他进程提供查找对应 binder 的服务。

内部使用示例

在 start Activity 时候它调用一开始的 IActivityManager,它中转到唯一一个 ActivityManagerService。

1
2
3
4
5
6
7
8
9
private static final Singleton<IActivityManager> IActivityManagerSingleton =
new Singleton<IActivityManager>() {
@Override
protected IActivityManager create() {
final IBinder b = ServiceManager.getService(Context.ACTIVITY_SERVICE);
final IActivityManager am = IActivityManager.Stub.asInterface(b);
return am;
}
};

结论

  1. Binder 将实现隐藏在了 C++ 代码里,导致 Java 层显得很清晰,使用简单。
  2. Binder 原理说简单其实很简单,就是利用 mmap 内存映射在跨进程时候只进行一次拷贝;但复杂也很复杂,对象转换,在 Java 层无感知的情况下,在反序列化中将对象替换。Server 进程会通过 Binder 线程池里 ioctl 监听来提供服务,Client 进程中线程会等待返回数据,期间涉及到查找服务,数据传递等等。
  3. Binder 里面一些命令 BC_ 开头命令代表进程传递到 driver,BR_ 开头命令代表 driver 传递到进程。

引言

提到缓存一般想到的是用 redis 和 memcache,一提到消息队列一般想到的是 kafka。尤其担心自己太片面了,见识跟不上了,毕竟几年前就在用 kafka 在日志和记录方面,就搜了一下发现现在主流的还是 kafka,还有比较新的 RocketMQ(2016年阿里捐献给 Apache)。ActiveMQ 和 RabbitMQ 一般不考虑,毕竟在吞吐量和可靠性不如前面 kafka。

详解

kafka 的使用很简单,部署好之后各个语言都有 client。以 go 举例。
kafka-go 发消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
conn, err := kafka.DialLeader(s.ctx, "tcp", "localhost:9092", s.topic, s.partition)
if err != nil {
return err
}
defer func() {
if err := conn.Close(); err != nil {
log.Println(err)
}
}()
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

_, err = conn.WriteMessages(kafka.Message{Value: message})
if err != nil {
return err
}

收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: "consumer-group-id",
Topic: defaultTopic,
MinBytes: 10, // 1KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
for {
currentTime := time.Now()
m, err := r.FetchMessage(s.ctx)
// m, err := r.ReadMessage(context.Background())
if err != nil {
log.Println(err)
// return nil, err
}
message := &model.Message{}
err = proto.Unmarshal(m.Value, message)
if err != nil {
return nil, err
}
if err := r.CommitMessages(s.ctx, m); err != nil {
log.Println("failed to commit messages:", err)
}
log.Println("consume message duration: ", time.Now().Sub(currentTime).Milliseconds())
log.Println(message)
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

注意点:

  1. 1个 topic 可以由多个 partition,但是一个 partition 只能由一个 consumer 消费。
  2. 1个 consumer 可以订阅多个 topic。会按照一定规则分配。例如 range 尽量平均分配,有余数就从第一个开始多承担一个。
  3. 生产环境肯定要搭建多个 brokers,
  4. 可以通过 kafka-topics --zookeeper localhost:2181 --create --topic kf-test --partitions 4 --replication-factor 3 来创建主题, partitions 一般一个有几兆的吞吐量,结合实际。1个 topic 不要创建太多的 partitions,控制在最多几百。一个 brokers partitions 控制在2000以内。一个集群 partitions 控制在 20000 以内。因为 partitions 太多,当一个 brokers 崩溃后会进行大量的选举。

不足:

  1. 自身不支持消息重试机制。在消息处理失败重试的情况下,只能自己想办法重试。
  2. 消费者 Rebalance 机制,可能会重复消费和影响性能。在消费者崩溃或者主动减少增加、分区增加和topic 订阅变化三种情况下会触发 Rebalance,后2者业务考虑,无法避免,只能尽量减少。但是消费者心跳失联和消费超时的异常情况是需要避免的,通过参数来达到预期效果。具体来看消息心跳最大是超时时间的1/3,消息的消费时长*拉取的消息数量必须远小于设置的拉取消息间隔时间,且该间隔时间小于超时时间。

总结

分布式系统有存在 CAP 特性,即一致性、可用性、分区容错不能同时满足,只能同时满足2个。一般分布式系统都是满足高可用性和分区容错,一致性只能尽量保证最终一致性。kafka 的高吞吐量在应用场景很广泛,因为其分区对象文件的简单架构来确保高的性能。但是想基于它搭建一个完整的消息系统还需要解决不少问题尤其在对消息重试机制上需要结合具体业务分析,一般的可能先消费者自己重试几次,仍失败就发布到重试 topic 单独处理恢复来确保最终的一致性。

栈和队列

java

Stack 继承自 Vector,是线程安全的列表实现。
Deque 是继承自 Queue,Queue 在 Collection 基础上定义了队列的方法,Deque 在 Queue 基础上定义了双向队列的方法。
java 里栈一般推荐用实现了 Deque 的 ArrayDeque 。ArrayDeque 和 Stack 内部实现差不多,都是基于数组实现的,但 ArrayDeque 效率比 Stack 高很多。

  1. Stack 是线程安全的,每个方法都有 synchronized 。
  2. ArrayDeque 内部只在空间不够的时候,进行扩容。而 Stack 其基于 Vector 会不断进行数组的 arraycopy 操作。
    所以其实就算要线程安全,也可以使用 Collections.synchronizedXXX 来处理或者用更高效的类。

go

go 的 mao 和 slice 是我看的比较累的源码,或者说有的根本就没有源码了。原因是有的直接就汇编进行了,有的方法它内嵌到 runtime 中了,没有明显的调用逻辑。
go 里面栈和队列都不用新建类,通过 slice 的一顿切片操作就可以满足。go 中切片原理简单来说就是创建一个指向原数组内存的新的结构体,不会进行拷贝数据等等。但切片不好点在于因为不会实际上删除,所以头部空间会一直存在。

C++

SGI STL 下,stack 会以 deque 为缺省情况的数据结构,也可以额外指定数据结构。所以 stack 不是容器,而是 contain adapter 的概念。其不会提供迭代器。

python3

在 python 中,就比较简单了。作为 stack,数组自带 append 相当于 push,list.pop() 相当于 pop,list[-1] 就是 peek。当然队列也可以这么干,但是性能就很低了,尤其在长数组的时候,因为移除头部数据会导致所有元素位移。队列应该使用 from collections import deque。python 也有切片,但它和 go 不同,它的切片会复制内存,而 go 不会。

一般生产里面用到的少。在高性能的地方会用到,因为树天然的分而治之的思想。

java

HashMap 里面同一个链表超过 8 会转而使用红黑树。PriorityQueue 里面用了最大堆最小堆的概念,实现它用了数组来实现,类似最大堆这种完全二叉树,用列表表示是比较清晰而且省内存。

HashMap

java

HashMap 内部初始化用了泊松分布得出的0.75因子,,扩容会重新计算 hash 来扩展,1.8 开始里面同一个链表超过 8 会转而使用红黑树。java 1.7 并发类使用了分桶加锁。java 1.8 并发类使用了 CAS+synchronized Node 来确保并发,保留了 HashMap 的红黑树等等特性。

go

Map 使用数组+桶+溢出桶链表。 sync.Map 则使用了 dirty,适用于读多写少场景。在其它场景还是直接用锁比较好。

引言

关于高并发的思考一文里面,我提出了高并发会导致的问题,在那边文章里最后的结论是高并发在确保底层存储一致性的情况下,最好通过设计来避免。但实际上业务之间的耦合可能往往无法提前预料到,相互的影响要是很频繁的话,实际上很难通过设计来确保。那么我们是不是就需要一种全局锁或者类似的方案来解决这一情况。

redis 的乐观锁

在 redis 中,实际上存在一种解决方法。就是通过 WATCH、MULTI、EXEC 命令来处理。原理就是 WATCH 来监控某一个变量,然后 MULTI 批量提交一系列的命令已经修改该变量,然后 ECEC 运行。因为 WATCH 的特性,一旦该变量被修改,那么后续的事务就不会运行。因为 MULTI、EXEC 事务的特性,一系列命令会被原子的执行,不会被其他命令插入。
注意点:

  1. 在事务执行前,要是被 redis 检测出错误,那么就不会执行事务。
  2. 但如果一些在事务执行中暴露的错误会被忽略,会继续执行事务,比如对 string 做 list 的操作等等(因为这种错误一般在测试环境发现)。
    不足:
  3. WATCH 时机是应该什么时候呢?如果是在业务开始,那么稍微并发一下,碰撞的概率就会大幅度增加,不断的重试也会导致碰撞。如果是在后面,那么这个代码不确定性会大幅增加也不利于维护。
  4. 错误。这很明显,redis 不管编程问题。那么一旦测试遗漏,出错了的话,很难恢复(这个在兼容处理的时候,感觉出错的可能性还是很高的)。
  5. 局限性。基于 redis 的处理,而一般后面还有一层持久化的关系型存储。这一层在这里无法兼顾到。

seata

本质上是两阶段提交协议。

  1. 先开启本地事务,尝试获取全局锁(超时时间),然后提交事务,插入滚动日志,结果上报。
  2. 若收到回滚请求开启本地事务,根据日志回滚,上报结果。若收到提交请求,放入异步任务队列,提交结果。
    SEATA 提供的长事务解决方案是 Saga 模式,但它的隔离性还是不足。实际上之前讨论的问题仍然存在,但是通过机制来降低了概率,在大部分场景可以正常。但是要求写补偿服务,即回滚接口。

引言

每次出现奇怪的问题,怎么都查不出来的时候,初步看代码也没什么问题,我大概率怀疑是并发导致的。在某一步因为并发导致数据缺失或者被覆盖。
在开发中比较常见的就是资源的查询和更新。如果查询和更新接口并发访问,那么在执行的时候就会有2种可能性。查询在更新前,那么返回给客户端的是旧的数据,服务器后续更新接口返回成功,客户端用哪个就不好说了。查询在更新后,这个是符合预期的结果。
如果是2个更新相关的接口同时并发,那么就更糟糕了。如果业务复杂的话,更新的逻辑相互嵌套,这会就更难以预测到底会发生什么。
多个并发那是噩梦了。

思考

首先是确定程序是最终一致性还是强一致性以及幂等性,要是客户端要是再次触发,怎么避免重复。

一致性问题

解决方法最先想到的就是加锁来确保这一点,但是在业务复杂的场景下很难做,锁多了的话,一个处理不好就可能导致死锁的问题,且性能影响也是比较巨大,想做好很难。全局锁能解决一部分问题。但是十分影响性能。
实际中并发主要问题就是一些公共资源的同时使用。确保修改的时候值是没有被其它修改过的。
主流的主要是乐观和悲观的实现。悲观确保执行的时候只有一个线程涉及公共资源的在运行。乐观是在最后修改的时候在进行对比旧值进行修改。

幂等性即可重复性

一个操作应该是可以重复的,即无论做多少次,产生的结果是一样的。换句话来说就是接口如果参数不变的话,调用多少次都是一样的结果,不会产生别的影响。

举例

在缓存的时候,更新数据库的值和缓存的值。如果先更新数据库,在更新缓存。那么在这期间就存在缓存没有及时更新的情况(部分场景是很致命的)。一种先更新缓存,在更新数据库,这种存在的问题是数据库更新失败的时候,不好处理。还有是先删除缓存,在更新数据库。但这里存在问题是,可能别的线程在删除缓存后又读出数据,导致缓存存储旧值。在优化一点是先删除缓存,更新数据库后,间隔时间在删除一次缓存(时间大于读数据库+写缓存)。这个方案的漏洞是如果延迟删除缓存失败,还是可能存在存储旧值的情况。这里面需要机制来确保,重试机制或者消息机制。重要一点是更新数据库的时候最好加上旧值的判断(乐观锁的概念)。
该例子目标是为了最终一致性。这适用于中间状态不对也不会产生持久化的负面影响的场景。
从点到面,确保底层数据的一致性和操作幂等性,模块的一致性其实也类似,需要考虑失败的情况,比如更新第一个数值成功了,更新第二个数值失败,因为解耦合,没办法简单用事务等方式来处理(且 mysql 事务并不难阻止常见的并发更新,除非用 for update 悲观锁或者手动实现乐观锁)。这种没有通用的做法,因为你没办法要求每次提供更新数据的接口,还要提供回滚的接口。只能尽量从设计层面避免,将影响降到最低。比如失败的情况,增加重试或者标记或者监控等等方式结合来实现程序的稳定。

结论

高并发要想完全解决是一件基本不可能的事情,不可能为了千分之一甚至万分之一的概率来投入大量的成本。在设计层面进行把关是最优解。当然底层数据存储的一致性是肯定要确保的。

0%