从字节Netpoll中学习相关Read问题处理
Append[:0]细节错误
我们先来看一段代码:
package main
import (
"net"
)
func main() {
var conn net.Conn
var buf = make([]byte, 8192)
// reading
for {
n, _ := conn.Read(buf)
... unpacking & handling ...
var i int
for i = 0; i <= n-pkgsize; i += pkgsize {
pkg := append([]byte{}, buf[i:i+pkgsize]...)
go func() {
... handling pkg ...
}
}
buf = append(buf[:0], buf[i:n]...)
}
// writing
var write_datas <-chan []byte
... packing write ...
for {
pkg := <-write_datas
conn.Write(pkg)
}
}这里假设我们有一个 8192 长度和大小的 []byte。
buf := make([]byte, 8192) // len=8192, cap=8192
// 切片表达式:buf[low:high]
// 新切片 = (相同底层数组, len=high-low, cap=cap-low)| 表达式 | len | cap | 底层数组 |
|---|---|---|---|
buf | 8192 | 8192 | 原数组 |
buf[:0] | 0 | 8192 | 原数组 |
buf[100:] | 8092 | 8092 | 原数组 |
buf[:100] | 100 | 8192 | 原数组 |
buf = append(buf[:0], buf[i:n]...)
// ↑ 第一步:buf[:0] 创建 len=0 的切片
// ↑ 第二步:把 buf[i:n] 追加到这个 len=0 的切片// 1. 创建头切片,len=0,但 cap 还是 8192
head := buf[:0]
// head = (ptr=原数组, len=0, cap=8192)
// 2. 追加残留数据(假设 3 字节)
residual := buf[i:n] // len=3
// 3. append 逻辑:如果 cap 够,直接写到原数组
newBuf := append(head, residual...)
// 结果:len=3, cap=8192,底层数组前 3 字节被改成 residual重要的是这个准则: 新切片 = (相同底层数组, len=high-low, cap=cap-low)
这意味着如果像上面这要操作切片,能够修改原切片的各项数据:
// 示例:通过append修改切片
original := make([]byte, 8192)
original[0] = 'A'
slice := original[:0] // len=0,但指向同一数组
newSlice := append(slice, 'X') // append数据到original[0],相当于覆盖掉original[0]
fmt.Println(original[0]) // 'X'而不是'A',原数组被改了正确的做法应该是:
// 正确:保留 len=8192,只移动内容
copy(buf, buf[i:n]) // 把残留拷贝到头部
// buf 的 len 还是 8192,前 (n-i) 字节是有效数据,后面是垃圾总结
| 操作 | len 变化 | 底层数组变化 | 下次 Read 能力 |
|---|---|---|---|
buf[:0] + append | 变成残留长度 | 头部被覆盖 | 只能读 len |
copy + 调整范围 | 不变 | 头部被覆盖 | 可指定范围读 |
为什么要聊这个?
因为字节 netpoll 官方文档里给出了这个示例:
package main
import (
"net"
)
func main() {
var conn net.Conn
var buf = make([]byte, 8192)
// reading
for {
n, _ := conn.Read(buf)
... unpacking & handling ...
var i int
for i = 0; i <= n-pkgsize; i += pkgsize {
pkg := append([]byte{}, buf[i:i+pkgsize]...)
go func() {
... handling pkg ...
}
}
buf = append(buf[:0], buf[i:n]...)
}
// writing
var write_datas <-chan []byte
... packing write ...
for {
pkg := <-write_datas
conn.Write(pkg)
}
}虽然这段代码只是字节为了引出他们 netpoll 相比于传统官方 net 包的优势。我们可以很清晰的看见这段网络数据获取的代码有比较多的问题:
read数据不全
n, _ := conn.Read(buf) 这一句,就有这数据分包的问题,这样读取并不能保证 buf 内的数据被读满。因为 Go 的 conn.Read(buf) 也是 POSIX read 的封装,实际读取的 n 可能 < len,且该做法是用户态 append 拷贝。
如果想详细了解可以看这位大佬对于他学长代码的吐槽,里面有涉及:
简单来说问题如下:
// ❌ 错误:假设一次读满
buf := make([]byte, 1024)
conn.Read(buf) // 可能只读了 100 字节,后面全是 0
process(buf) // 处理了 924 字节的垃圾数据这里只读到 100 字节的原因会有许多情况,例如数据损坏、网络中断等非正常不明原因。
正常来用的话最好是手动循环直至读满,或者用原装包强制读满的调用方法:
func readFull(conn net.Conn, buf []byte) error {
total := 0
for total < len(buf) {
n, err := conn.Read(buf[total:])
if err != nil {
if err == io.EOF && total > 0 {
return nil // 读到了部分数据,正常结束
}
return err
}
total += n
}
return nil
}import "io"
// 强制读满,否则返回错误
_, err := io.ReadFull(conn, buf)
// 或读到 EOF,自动扩容
data, err := io.ReadAll(conn) // 类似 Java 的 readAllBytes未处理错误
这个问题和上面是同处代码:n, _ := conn.Read(buf)
这里按理来说就应该有错误处理,这是很关键的错误处理。如果这里没有错误处理,那么出现各种数据读取情况开发人员都会不知道,代码只能当做正常情况继续处理。
| 场景 | 后果 |
|---|---|
err == io.EOF | 对端关闭,应退出循环,但这里继续读,n=0,死循环 |
err != nil(网络中断) | 应重连或报错,但这里忽略,可能读到脏数据 |
n == 0 && err == nil | 正常但无数据,继续循环,忙等待 CPU 飙升 |
分包逻辑隐患
for i = 0; i <= n-pkgsize; i += pkgsize {
pkg := append([]byte{}, buf[i:i+pkgsize]...) // 内存分配
go func() { ... }() // 每个包一个 goroutine,爆炸风险
}
buf = append(buf[:0], buf[i:n]...) // 残留数据前移| 问题 | 说明 |
|---|---|
| goroutine 泄漏 | 高并发下 go func() 无限制创建,OOM |
| 内存拷贝 | append([]byte{}, ...) 显式拷贝 |
健壮处理应该是:
func handleConn(conn net.Conn) {
defer conn.Close()
buf := make([]byte, 8192)
var residual []byte // 残留未完整包
for {
// ✅ 正确处理错误
n, err := conn.Read(buf)
if err != nil {
if err != io.EOF {
log.Printf("read error: %v", err)
}
break // EOF 或错误都退出
}
if n == 0 {
continue
}
// 合并残留数据
data := append(residual, buf[:n]...)
// 分包处理
var i int
for i = 0; i+pkgsize <= len(data); i += pkgsize {
pkg := data[i : i+pkgsize]
// 用 worker pool,不要无限制 go func
workerPool.Submit(func() {
handlePkg(pkg) // 注意:pkg 是 slice,可能被覆盖,需拷贝
})
}
// 保留未完整包
residual = append([]byte{}, data[i:]...)
}
}字节Netpoll的优化
package main
import (
"github.com/cloudwego/netpoll"
)
func main() {
var conn netpoll.Connection
// reading
reader := conn.Reader()
for {
... unpacking & handling ...
pkg, _ := reader.Slice(pkgsize)
go func() {
... handling pkg ...
pkg.Release()
}
}
// writing
var write_datas <-chan netpoll.Writer
... packing write ...
writer := conn.Writer()
for {
select {
case pkg := <-write_datas:
writer.Append(pkg)
default:
if writer.MallocLen() > 0 {
writer.Flush()
}
}
}
}从上面代码 pkg, _ := reader.Slice(pkgsize) 就可以看得出来,Netpoll 可以直接阻塞读,阻塞直到 pkgsize 字节就绪。不会有之前说的 POSIX read 不能读满的问题,而且 reader.Slice() 是零拷贝,优化了一定的缓存使用。
具体相关实现得去官方仓库看源码了:https://github.com/cloudwego/netpoll/blob/main/connection.go
简洁来说就是:Slice 底层基于 io.Reader.Read,但封装了 waitRead 阻塞语义 + fill 批量预读 + LinkBuffer 零拷贝切片。它把"多次小读"优化成"一次大读多次切片",既兼容标准接口,又实现高性能。
netpoll Slice:
用户要 10 字节 → 检查 LinkBuffer 有 100 字节(上次预读的)
↓
直接零拷贝切片 10 字节返回(无 syscall)
如果不够 → fill() 一次性读 16*4KB=64KB 到缓冲区
↓
下次 Slice 直接从内存取(零拷贝)AI读仓库的代码回复的,具体我也在学习中。
RoLingG | 博客
评论(0)