

从头再读取 io.Reader: 覆水难收?
source link: https://colobu.com/2023/09/24/reread-the-io-Reader/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

从头再读取 io.Reader: 覆水难收?
前几天,我们百度的同学分享了Go标准库中一段好玩的好玩的代码, net/http/response.go
中一段检查HTTP的headser中Content-Length
未设置的情况下,对http.Body
的有趣的处理。
我们知道io.Reader
提供了Read
方法,并没有将读取的数据再塞回去的方法,而且对于流式的Reader,也绝无可能将数据塞回去。就像覆水难收一样,泼出去的水,没办法收回来了。
如果我们想从Reader读取一部分字节,做一些处理(一般是做一些检查),然后想再让调用者从头开始读取咋办?
net/http/response.go
中就有这么一段代码
// Clone it, so we can modify r1 as needed.
r1 := new(Response)
*r1 = *r
if r1.ContentLength == 0 && r1.Body != nil {
// Is it actually 0 length? Or just unknown?
var buf [1]byte
n, err := r1.Body.Read(buf[:])
if err != nil && err != io.EOF {
return err
if n == 0 {
// Reset it to a known zero reader, in case underlying one
// is unhappy being read repeatedly.
r1.Body = NoBody
} else {
r1.ContentLength = -1
r1.Body = struct {
io.Reader
io.Closer
io.MultiReader(bytes.NewReader(buf[:1]), r.Body),
r.Body,
这段代码主要是针对响应(Response)对象做了一次克隆(Clone),目的是为了能够安全地修改响应对象,而不影响原始的响应对象。
具体来看:
- r1 := new(Response) 创建一个新的响应对象
- r1 = r 让r1成为原始响应r的克隆
- 接下来判断如果响应内容长度r1.ContentLength == 0 且响应体r1.Body != nil
- 这说明内容长度标记为0,但实际上响应体不为空。这种情况下无法确定内容究竟是0长度还是长度未知。
- 所以读取1字节到buf,以判断响应体是否真是0长度。
- 如果读取到EOF,说明响应体确实长度为0,将Body重置为NoBody。
- 否则说明长度未知,将ContentLength设置为-1,并用MultiReader将已读取的1字节内容和原Body组合,作为新Body。
- 这样通过克隆的响应对象r1,可以安全地修改ContentLength和Body,而不影响原始响应对象r。
这段代码通过克隆请求对象,巧妙地处理了内容长度标记为0但实际有内容的情况,避免了对原始响应对象的修改。
它先读取了1个字节,来判断Body是否为空,不为空在通过io.MultiReader(bytes.NewReader(buf[:1]), r.Body),
把这一个字节和原来的io.Reader(r.Body
)在捏合在一起,形成一个新的io.Reader。
通过io.MultiReader
新建一个 io.Reader,就可以把已读取的字节和剩余未读取的字节组合起来,形成都未读取的Reader。
标准库net/http/transfer.go
中也有一段相同的逻辑处理。
这让我想起了soheilhy/cmux, rpcx最早使用它在一个端口上提供不同传输的协议。
cmux也是预先读取一部分数据,和预先配置的Matcher进行匹配,如果匹配成功,比如HTTP1.1协议,那么这个连接后续就按照HTTP1.1协议进行解析。那么预先读取的这些字节也得交给解析器从头开始解析,否则数据就缺失了,那么它是怎么实现的呢?
cmux使用老二另外一个方法,它创建了一个bufferedReader:
type bufferedReader struct {
source io.Reader
buffer bytes.Buffer
bufferRead int
bufferSize int
sniffing bool
lastErr error
当连接开始在侦探和哪个Matcher匹配的时候, conn连接会把数据写入到这个buffer中:
func (s *bufferedReader) Read(p []byte) (int, error) {
if s.bufferSize > s.bufferRead { // buffer中有未读的数据,先读取这个
bn := copy(p, s.buffer.Bytes()[s.bufferRead:s.bufferSize])
s.bufferRead += bn
return bn, s.lastErr
} else if !s.sniffing && s.buffer.Cap() != 0 {
s.buffer = bytes.Buffer{}
// 从原始的conn中读取
sn, sErr := s.source.Read(p)
if sn > 0 && s.sniffing { // 如果还在侦探状态,把读取的数据写入到buffer中
s.lastErr = sErr
if wn, wErr := s.buffer.Write(p[:sn]); wErr != nil {
return wn, wErr
return sn, sErr
一旦侦探完成(match一个协议),那么就会把读取的指针置为最开始的地方,从头开始读取,根据上面的方法的逻辑,读取完buffer就从原始conn中读取,也不会再往buffer中写。
func (m *MuxConn) doneSniffing() {
m.buf.reset(false)
func (s *bufferedReader) reset(snif bool) {
s.sniffing = snif
s.bufferRead = 0 // 退回到原点,从最开始的数据开始读
s.bufferSize = s.buffer.Len() // 已读取的数据
通过这种方式,也实现了预读取的功能。
看起来,在Go语言中,还真的能覆水再回收。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK