golang分布式存储 读书笔记(2)——流操作之PutStream封装
source link: https://studygolang.com/articles/18146?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
接着上一篇的 golang分布式存储 读书笔记(1)——流操作之GetStream封装 ,这次要讲的是上传文件并保存,使用 restful
的 PUT
方法,书中封装了 PutStream
结构。
接口设计
客户端上传数据时向接口服务器发送 PUT
请求,请求的 url
为 /objects/<object_name>
。
同样接口服务器向数据服务器转发 PUT
请求,请求的 url
为 /objects/<object_name>
。数据服务器在本地指定文件夹( D:/objects/
)下创建 <object_name>
文件,将 PUT
的内容写入文件中。
目录结构
在 GOPATH/src
目录下,目录结构为:
go-storage apiServer objects get.go put.go apiServer.go dataServer dataServer.go
数据服务器实现
dataServer代码
数据服务器的 put
接口和 get
接口很类似,只不过将读文件改为了写文件。
package main import ( "net/http" "io" "os" "log" "strings" ) const ( objectDir = "D:/objects/" ) func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method log.Println(m) if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { // 提取文件名 fname := strings.Split(r.URL.EscapedPath(), "/")[2] log.Println(fname) // 创建文件 f, e := os.Create(objectDir + fname) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } defer f.Close() // 往文件写入数据 io.Copy(f, r.Body) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8889", nil) }
io.Copy(f, r.Body)
将 r.body
的数据写入 f
中,其中 r.Body
实现了 io.ReadCloser
接口, f
实现了 io.Writer
接口。最后要记得关闭文件。
测试
使用 Restlet Client
发送 PUT
请求进行测试。
测试数据服务器.png
往 http://localhost:8889/objects/1.txt
发送 PUT
请求,可以看到本地确实生成了 1.txt
文件。
接口服务器实现
版本一
实现PUT请求
可以使用 http.NewRequest
构造一个 PUT
请求,使用 http.Client
构造一个客户端进行发送。
例如:
request, _ := http.NewRequest("PUT","http://127.0.0.1:8889/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 发送并接受请求
完整代码如下:
package main import ( "net/http" "io" "strings" "go-storage/apiServer/objects" "log" ) const dataServerAddr = "http://localhost:8889/objects/" func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() object := strings.Split(r.URL.EscapedPath(), "/")[2] request, _ := http.NewRequest("PUT",dataServerAddr + object, r.Body) client := http.Client{} resp, e := client.Do(request) // 发送并接受请求 if e == nil && resp.StatusCode != http.StatusOK { w.WriteHeader(http.StatusNotFound) log.Printf("dataServer return http code %d", resp.StatusCode) return } defer resp.Body.Close() io.Copy(w, resp.Body) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8888", nil) }
测试
接口服务器测试.png
往 http://localhost:8888/objects/2.txt
发送 PUT
请求,可以看到本地确实生成了 2.txt
文件。
版本二——封装版本
书上的代码实现的相对复杂一点,将整个操作封装成了一个 PutStream
的结构体,先看结构体的具体成员:
type PutStream struct { writer *io.PipeWriter c chan error }
可以看到其中一个成员是 io.PipeWriter
类型,这类似于 linux
中的管道,一端写入,一读取读取,这里是为了在两个协程之间建立连接,这里使用 channel
并不好使。
另一个成员是 channel
类型,因为子协程不能有返回值,所以这里用通道传递错误。
PutStream
的构造函数如下:
func NewPutStream(server, object string) *PutStream { reader, writer := io.Pipe() // 通过管道将 两个协程 联系起来 (用channel应该也可以把?) c := make(chan error) go func() { request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 如果 reader一直没有数据,是不是 Do就会阻塞? if e == nil && r.StatusCode != http.StatusOK { e = fmt.Errorf("dataServer return http code %d", r.StatusCode) } c <- e }() return &PutStream{writer, c} }
先使用 io.Pipe()
构造一个 writer
和 reader
,再初始化一个通道 c
,最后使用 writer
和 c
构造一个 PutStream
指针对象返回。
中间开启了一个子协程,该协程构造一个 http.NewRequest
,并使用 http.Client
构造客户端发送请求。其中 request
构造的时候使用了管道的读端 reader
,此时该管道的读端并没有数据,但是 http.NewRequest
构造 request
的时候并不会阻塞,而是会阻塞在 client.Do(request)
这句代码,直到管道的写端写入数据。
如果得到的响应出错了,将错误写入管道 c
中。
同时这个 PutStream
需要实现 io.Writer
和 io.Writer
接口:
// 实现了 io.Writer接口 func (w *PutStream) Write(p []byte) (n int, err error) { return w.writer.Write(p) } // 关闭流并得到错误 func (w *PutStream) Close() error { w.writer.Close() // io.PipeWriter 关闭, reader也会关闭? client.Do(request)才能结束? return <-w.c }
由于功能是要上传并保存一个对象,所以实现一个 StoreObject
方法来调用 PutStream
。
func StoreObject(r io.Reader, object string) (int, error) { stream := NewPutStream(data_server, object) // 会阻塞,直到r中收到 EOF,stream实现了io.Writer接口 io.Copy(stream, r) // 将r的内容拷贝的 stream 中,stream有数据的时候,他对应的reader也就有了数据 // 会阻塞到 stream中的c channel收到消息 e := stream.Close() if e != nil { return http.StatusInternalServerError, e } return http.StatusOK, nil }
新建一个 PutStream
指针类型的 stream
,由于它实现了 io.Writer
接口,所以可以调用 io.Copy
将 r
中的内容复制到 stream
中,其实就是写入到管道的写端。最后关闭流,管道写端写入也就结束,读端也读取结束,子协程的发送也就结束了。
其实该版本的实现和版本一是一样的,只不过多了一个子协程,多使用了 io.Pipe
管道。
看起来其实版本一更加简单、直接。暂时也看不出封装的优势,也许在后面功能越来越复杂的时候,就可以体现这个优势。个人感觉这个封装还是比较优雅。
完整代码
package objects, put.go
package objects import ( "net/http" "fmt" "io" ) type PutStream struct { writer *io.PipeWriter c chan error } const ( data_server = "127.0.0.1:8889" ) func StoreObject(r io.Reader, object string) (int, error) { stream := NewPutStream(data_server, object) // 会阻塞到 r中收到 EOF stream实现了io.Writer接口 io.Copy(stream, r) // 将r的内容拷贝的 stream 中,stream有数据的时候,他对应的reader也就有了数据 // 会阻塞到 stream中的c channel收到消息 e := stream.Close() if e != nil { return http.StatusInternalServerError, e } return http.StatusOK, nil } func NewPutStream(server, object string) *PutStream { reader, writer := io.Pipe() // 通过管道将 两个协程 联系起来 (用channel应该也可以把?) c := make(chan error) go func() { request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 如果 reader一直没有数据,是不是 Do就会阻塞? if e == nil && r.StatusCode != http.StatusOK { e = fmt.Errorf("dataServer return http code %d", r.StatusCode) } c <- e }() return &PutStream{writer, c} } // 实现了 io.Writer接口 func (w *PutStream) Write(p []byte) (n int, err error) { return w.writer.Write(p) } func (w *PutStream) Close() error { w.writer.Close() // io.PipeWriter 关闭, reader也会关闭? client.Do(request)才能结束? return <-w.c }
package main, apiServer.go
package main import ( "net/http" "io" "strings" "go-storage/apiServer/objects" "log" ) const dataServerAddr = "http://localhost:8889/objects/" func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { // object 要保存的对象名 object := strings.Split(r.URL.EscapedPath(), "/")[2] c, e := objects.StoreObject(r.Body, object) if e != nil { log.Println(e) } w.WriteHeader(c) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8888", nil) }
测试过程同版本一。
疑问
- 如果传输一个
4g
的文件,到底需不需要占用4g
内容?在上一篇文章中我认为这种流操作可以减小内存占用,但是现在不太确定。
参考
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK