47

Golang实现并发聊天室

 4 years ago
source link: https://studygolang.com/articles/25502
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

前言以及项目简介

Golang是一门极为出色的语言,近些年也越发火热。每一种语言火起来都有它的道理,Golang就是以其独树一帜的并发处理和高性能博得了众多程序员的青睐,不少的C/C++、Java、PHP开发者都已经或逐渐转型扑向Go语言的怀抱。 从当初刚刚接触Go到现在大概有25天了,一直在看某马的培训视频,也确实学到了不少东西。这个并发聊天室就是他们GO语言与区块链就业班的 阶段性学习项目 。该项目处于整个课程中的第二阶段——并发编程与网络编程,这个并发聊天室就作为此阶段的收尾项目,可见其不容小觑的学习意义和价值。 整个项目的讲解视频共有12节,平均每节十几分钟。我只是看了前三节,包括这个聊天室的功能简介,老师会把项目完整运行一遍,给大家展示其具备的所有功能,还有这个项目的核心部分——并发处理机制,当时我就是在这里反复看了好几遍。然后我就开始做这个项目了,也没去看他给的源码里所有模块的具体实现。感觉有了大体的方向以后我就可以先尝试自己走,到项目这里跟老师跟太紧的话容易给自己弄晕。 历时不到两天也算是把这个项目做完了,我做的时候给分成了两个阶段,第一阶段完成了核心功能:用户进入聊天室后把用户进入的信息广播发送给其他用户,还有广播每位用户发送的信息。这一阶段参照了老师给的源码里manager()这个函数;第二阶段就是剩下的功能模块,像用户改名、用户退出、超时处理这些,在完成超时处理的时候参照了源码里select监听超时这段代码块的位置。 所以说这个项目基本全是我一个字一个字码上去的,至于老师给的源码我也只是参照了一小部分,到现在我也没运行过老师给的源码,而且老师的课程和源码都只有服务器的,没有客户端,我是都做出来了的。本文后面会把我的源码和老师给的源码都贴出来。

门槛

然后说一下这个并发聊天室项目的门槛,或者说它适合什么样水平的人学习。整个项目涉及到的知识有:分支、循环、函数、map、结构体、并发编程、网络编程、select超时处理等等。项目虽然不大,但是涵盖了不少的基础知识,所以非常适合刚看完一本入门书、学完基础的人拿过来练手。至于大佬可以多批评,提提建议,或者直接无视。

项目演示截图

先给大家看一下这个聊天室运行起来后是什么样子,大致有个形象地认识。 视频中的演示截图: EJfIJfj.png!web 视频里的演示就是这个样子的,是不是看起来一点都不像聊天室?哈哈哈哈哈哈,接下来看看 我做的聊天室 : 我演示的时候同时开了三个客户端,用户名分别是用的马化腾、马云、周鸿祎,截图的时候只截了马化腾和周鸿祎的,这些已经能够完整地展示出聊天室支持的各个功能。 马化腾的客户端: nANnAjE.png!webEvInUj6.png!webnAbmAnF.png!web 周鸿祎的客户端: 2eEBNzJ.png!web 我的后台服务器: MRbuUrz.png!web 可以看到我的聊天室对于交互的友好性做了很多努力,加入了一些提示字符,让整个聊天室结构显得更清晰了,后续学完前端还可以进一步完善。需要注意的是最好在Goland-IDE运行我的代码,如果在cmd命令行里运行的话,那些符号都会产生乱码,无法识别。源码会在本文后面给出。

项目流程图

有个逻辑清晰、结构明了的流程图能帮助我们省去很多不必要的麻烦,不然直接开始撸代码的话可能会造成思维混乱,跳不出来。 先来看看培训班给的流程图: vMVnMbV.png!web6JVRVjY.png!web 然后是我的流程图: vmumM3n.png!webuIvMjqm.png!web 其实对于老师给的流程图我基本上没怎么细看,那就讲讲我自己的流程图吧,第一张图基本上是所有TCP数据传输的整体流程,没什么好说的,学完TCP都应该掌握,只有那个manager函数是这个项目里独有的。第二张图先从客户端开始看,因为服务器在循环监听着通信套接字,客户端是主动发送消息的一端,客户端的消息对应图上绿色的小对话框,服务器收到这个消息后经过一系列处理变成黄色的小对话框,随后经过三个channel又被返回给了客户端。这就是并发聊天室的核心——并发处理流程。

各个功能模块详细说明

全局变量以及关键函数

  1. var message = make(chan []byte); 这是一个无缓冲channel,所有客户端发到服务器的消息经过处理后都会被写入这个channel
  2. manager()函数,这是我们项目里的管家,专门负责监听着全局channel——message的读端,一旦有消息写进来,manager就负责把消息广播给所有在线用户。
//管家循环监听管道message
func manager() {
	for {
		select {
		case msg := <-message:
			for _, v := range onlineUsers {
				v.C <- msg
			}
		}
	}
}
  1. 存储用户信息的结构体,其中C专门处理用户发送的消息,NewUser专门处理用户进入聊天室和退出聊天室的信息
//管家循环监听管道message
type userInfo struct {
	name    string
	C       chan []byte
	NewUser chan []byte //用于广播用户进入或退出当前聊天室的信息
}
  1. var onlineUsers = make(map[string]userInfo) 这个map用于存储所有的在线用户,键是该用户的ip+port,值就是第三条说的结构体
  2. func HandleConnect(conn net.Conn) 这是专门用于处理服务器与单独客户端之间读写的函数它和accept函数都被放在一个死循环里,accept一旦受到客户端连接请求并连接成功,就会启动go程去执行这个函数,与此同时主go程就循环回去继续阻塞在accept函数,监听着其他的客户端连接请求。

广播用户上线

客户端连接成功后,我会要求他先输入一个用户名,然后客户端把这个用户名使用write发送给服务器,服务器收到这个用户名以后就把他存到全局map里面,把用户名拿来再加上一些提示信息发送给每位用户的channel——NewUser,然后立刻启动go程去监听每个用户自己的channel,一旦有消息写进来,服务器就把从channel里读出来的用户上线信息用write发送给客户端。

广播用户消息

与之前处理广播用户上线类似,

msg = append([]byte(":speech_balloon:["+thisUser+"]对大家说:"), buf2[:n]...)

但是这里thisUser不能和上面的读到的用户名一致了,当时在这个问题上困扰了好久。就是说现在面临的问题是:当多个客户端同时连接到服务器,此时服务器如何判断读到的信息来自哪个客户端?我的答案是:当服务器读到客户端发来的消息后,立刻调用conn.RemoteAddr()即可获取当前发送消息的客户端的地址。

展示在线用户名

同样在服务器循环阻塞监听读取客户端消息的后面,加上一个switch分支结构,如果读来的消息和“who”相等的话,就用for遍历一下全局的map

修改用户名

指定用户修改用户名的方法是“rename|”+新用户名,只需在上一条的switch分支里加上case,消息的前7个字符和“rename|”相等的话,就执行后续改名的代码。需要注意的是,这里处理完成后紧接着还要在广播用户消息的地方改一下,不然服务器会把“who”、“rename|”这些东西都给广播出去。

用户退出

用户退出主要有那么几种情形:

  1. 用户关闭了客户端,或者用户手动停止了当前程序的运行。这个情形不难处理,还是在上面说的那个switch分支里面,加上一个case,如果服务器读出来的内容是0,就说明客户端已经断开连接了。相应的代码片段:
n, err := conn.Read(buf2)
			//用于存储当前与服务器通信的客户端上的那个同户名
			thisUser := onlineUsers[conn.RemoteAddr().String()].name
			switch {
			case n == 0:
				fmt.Println(conn.RemoteAddr(), "已断开连接")
				for _, v := range onlineUsers {
					if thisUser != "" {
						v.NewUser <- []byte(":dash:用户[" + thisUser + "]已退出当前聊天室\n")
					}
				}
  1. 如果服务器主动在HandleConnect()函数里执行了return,或者服务器程序关闭、停止运行,也会导致客户端的退出,这种情形对应的代码是客户端循环读取服务器消息时,读出内容的长度为0,和上一条是相对的。代码片段:
for {
		buffer2:=make([]byte,4096)
		n,err:=conn.Read(buffer2)
		if n==0{
			fmt.Println("服务器已关闭当前连接,正在退出……")
			return
		}
		if err!=nil{
			fmt.Println("conn.Read error:",err)
			return
		}
		fmt.Print(string(buffer2[:n]))

	}
  1. 第三种情形就是用户长时间没有发送消息,服务器会有一个超时处理的select,负责把这样的用户踢出聊天室。具体内容在下一个超时处理模块分析。

超时处理

当时在解决这个问题的时候也是琢磨了好久尝试过很多种channel,放到过很多位置也没能实现。最后的解决思路是这样的:服务器中有一个go程专门负责循环监听着客户端发来的消息,如果客户端没有任何动作,服务器就会在相应的conn.Read()处阻塞,所以应在HandleConnect函数的开头处定义一个控制超时的channel——overTime,当服务器读到客户端消息后,再给overTime写入“true”,那么负责监听overTime输出端和超时的select代码块就只能放在这个go程的外面。 相应代码片段:

for {
		select {
		case <-overTime:
		case <-time.After(time.Second * 60):
			_, _ = conn.Write([]byte("抱歉,由于长时间未发送聊天内容,您已被系统踢出"))
			thisUser := onlineUsers[conn.RemoteAddr().String()].name
			for _, v := range onlineUsers {
				if thisUser != "" {
					v.NewUser <- []byte(":dash:用户[" + thisUser + "]由于长时间未发送消息已被踢出当前聊天室\n")
				}
			}
			delete(onlineUsers, conn.RemoteAddr().String())
			return
		}
	}

只有不断向overTime写入数据,这个select才不会进入计时。当overTime写入了数据后,这个case不作为,意味着立即重新执行循环,进入计时,也就是老师讲课时说的 重置计时器

总结

本文只对服务器的功能模块进行了详细说明,实际开发的时候我是服务器客户端同时来写的,如果你能完全看明白服务器的代码,那么客户端的代码就很容易了,所以这里对于客户端不做介绍。 我在整个项目中最耗时的部分是:整个并发机制的理解,就是说所有这5个channel各自的作用,怎么协同运行的,需要动脑筋去思考;然后是select超时处理那部分,之前我把管家manager放到了监听客户端消息的那个go程里面了,带来了很多麻烦。其实管家manager只需负责监听全局channel,不必写在HandleConnect里面,而且它里面用到的变量也基本都是全局变量,所以完全可以把它放到HandleConnect()外面,单独作为一个go程去运行。由此给我带来的教训是:如果一个函数或代码块与另一个函数之间不存在绝对的关联性,就不要放在另一个函数里面,否则就会产生相互依赖,进而带来麻烦,当一个项目的逻辑越来越复杂,能做到这一点还是很不容易的;还有就是用户修改名称那部分,牵扯出来很多逻辑判断,这里也调试了很久;再然后就是各种小bug了,可以说是不计其数,最后也是一点点得到了解决。

源码

最后给大家贴出我的这个并发聊天室源码和老师给的源码。老师的只有服务器,我的源码里服务器和客户端都有(知道你们都在等这个,哈哈哈哈哈)。先来看我的源码吧。 我的服务器源码:

package main

import (
	"fmt"
	"net"
	"time"
)

//定义一个全局的channel,用于处理从各个客户端读到的消息
var message = make(chan []byte)

//定义一个结构体userInfo,用于存储每位聊天室用户的信息(名称+用户各自的管道C)
type userInfo struct {
	name    string
	C       chan []byte
	NewUser chan []byte //用于广播用户进入或退出当前聊天室的信息
}

//定义一个map,用于存储聊天室中所有在线的用户和用户信息
var onlineUsers = make(map[string]userInfo)

func main() {
	listener, err := net.Listen("tcp", "127.0.0.1:8011")
	if err != nil {
		fmt.Println("net.Listen error:", err)
		return
	}
	fmt.Println("够浪聊天室-服务器已启动")

	fmt.Println("正在监听客户端连接请求……")

	//启动管家go程,不断监听全局channel————message
	go manager()

	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("listener.Accept error:", err)
			return
		}
		fmt.Printf("地址为[%v]的客户端已连接成功\n", conn.RemoteAddr())
		// 如果监听到连接请求并成功以后,
		// 服务器进入下面的go程,
		// 在该go程中处理服务器和该客户端之间的读写或其他事件
		// 与此同时,服务器在主go程中回去继续监听着其他客户端的连接请求
		go HandleConnect(conn)
	}

}

// 这个函数完成服务器对一个客户端的整套处理流程
func HandleConnect(conn net.Conn) {
	defer conn.Close()
	// 管道overTime用于处理超时
	overTime := make(chan bool)

	// 用于存储用户名信息
	buf1 := make([]byte, 4096)
	n, err := conn.Read(buf1)
	if err != nil {
		fmt.Println("conn.Read error:", err)
		return
	}
	userName := string(buf1[:n]) //n-1是为了去掉末尾的\n
	perC := make(chan []byte)
	perNewUser := make(chan []byte)
	user := userInfo{name: userName, C: perC, NewUser: perNewUser}
	onlineUsers[conn.RemoteAddr().String()] = user
	fmt.Printf("用户[%s]注册成功\n", userName)
	_, _ = conn.Write([]byte(":heart_decoration::heartbeat::sparkling_heart::revolving_hearts::yellow_heart:你好," + userName + ",欢迎来到『够浪』™聊天室,请畅所欲言!:gift_heart::cupid::heartpulse::two_hearts::heartpulse:"))
	//广播通知。遍历map
	go func() {
		for _, v := range onlineUsers {
			v.NewUser <- []byte("  用户[" + userName + "]已加入当前聊天室\n")
		}
	}()

	//监听每位用户自己的channel
	go func() {
		for {
			select {
			case msg1 := <-user.NewUser:
				_, _ = conn.Write(msg1)
			case msg2 := <-user.C:
				_, _ = conn.Write(msg2)

			}
		}
	}()

	//循环读取客户端发来的消息
	go func() {
		buf2 := make([]byte, 4096)
		for {
			n, err := conn.Read(buf2)
			//用于存储当前与服务器通信的客户端上的那个同户名
			thisUser := onlineUsers[conn.RemoteAddr().String()].name
			switch {
			case n == 0:
				fmt.Println(conn.RemoteAddr(), "已断开连接")
				for _, v := range onlineUsers {
					if thisUser != "" {
						v.NewUser <- []byte(":dash:用户[" + thisUser + "]已退出当前聊天室\n")
					}

				}
				delete(onlineUsers, conn.RemoteAddr().String())
				return
			case string(buf2[:n]) == "who\n":
				_, _ = conn.Write([]byte("当前在线用户:\n"))
				for _, v := range onlineUsers {
					//fmt.Println(v.name)
					_, _ = conn.Write([]byte("  " + v.name + "\n"))
				}
			case len(string(buf2[:n])) > 7 && string(buf2[:n])[:7] == "rename|":
				//n-1去掉buf2里的空格
				onlineUsers[conn.RemoteAddr().String()] = userInfo{name:string(buf2[:n-1])[7:],C: perC, NewUser: perNewUser}
				_, _ = conn.Write([]byte("您已成功修改用户名!\n"))
			}

			if err != nil {
				fmt.Println("conn.Read error:", err)
				return
			}

			var msg []byte
			if buf2[0] != 10 && string(buf2[:n]) != "who\n" {
				if len(string(buf2[:n])) <= 7 || string(buf2[:n])[:7] != "rename|" {
					msg = append([]byte(":speech_balloon:["+thisUser+"]对大家说:"), buf2[:n]...)
				}

			} else {
				msg = nil
			}
			//
			overTime <- true
			message <- msg
		}

	}()

	for {
		select {
		case <-overTime:
		case <-time.After(time.Second * 60):
			_, _ = conn.Write([]byte("抱歉,由于长时间未发送聊天内容,您已被系统踢出"))
			thisUser := onlineUsers[conn.RemoteAddr().String()].name
			for _, v := range onlineUsers {
				if thisUser != "" {
					v.NewUser <- []byte(":dash:用户[" + thisUser + "]由于长时间未发送消息已被踢出当前聊天室\n")
				}
			}
			delete(onlineUsers, conn.RemoteAddr().String())
			return
		}
	}

}

//管家循环监听管道message
func manager() {
	for {
		select {
		case msg := <-message:
			for _, v := range onlineUsers {
				v.C <- msg
			}
		}
	}
}

我的客户端源码:

package main

import (
	"fmt"
	"net"
	"os"
)

func main() {
	fmt.Println("正在连接服务器……")
	conn,err:=net.Dial("tcp","127.0.0.1:8011")
	if err!=nil{
		fmt.Println("net.Dial error:",err)
		return
	}
	defer conn.Close()
	fmt.Println("连接服务器成功")

	fmt.Println("先起一个名字吧:")
	var userName string
	//使用Scan输入,不允许出现空格
	_, _ = fmt.Scan(&userName)
	_, _ = conn.Write([]byte(userName))

	buf2:=make([]byte,4096)
	n, err := conn.Read(buf2)
	if err!=nil{
		fmt.Println("conn.Read error:",err)
		return
	}
	// 客户端收到“你好,***,欢迎来到够浪聊天室,请畅所欲言!”
	fmt.Println(string(buf2[:n]))
	fmt.Println("⚠提示:长时间没有发送消息会被系统强制踢出")

	//客户端发送消息到服务器
	go func() {
		for {
			buffer1:=make([]byte,4096)
			//这里使用Stdin标准输入,因为scanf无法识别空格
			n,err:=os.Stdin.Read(buffer1)
			if err!=nil{
				fmt.Println("os.Stdin.Read error:",err)
				continue
			}
			_, _ = conn.Write(buffer1[:n])   //写操作出现error的概率比较低,这里省去判断
		}
	}()
	//接收服务器发来的数据
	for {
		buffer2:=make([]byte,4096)
		n,err:=conn.Read(buffer2)
		if n==0{
			fmt.Println("服务器已关闭当前连接,正在退出……")
			return
		}
		if err!=nil{
			fmt.Println("conn.Read error:",err)
			return
		}
		fmt.Print(string(buffer2[:n]))

	}

}

老师给的服务器源码:

package main

import (
	"net"
	"fmt"
	"strings"
	"time"
)
// 创建用户结构体类型!
type Client struct {
	C chan string
	Name string
	Addr string
}

// 创建全局map,存储在线用户
var onlineMap map[string]Client

// 创建全局 channel 传递用户消息。
var message = make(chan string)

func WriteMsgToClient(clnt Client, conn net.Conn)  {
	// 监听 用户自带Channel 上是否有消息。
	for msg := range clnt.C {
		conn.Write([]byte(msg + "\n"))
	}
}

func MakeMsg(clnt Client, msg string) (buf string) {
	buf = "[" + clnt.Addr + "]" + clnt.Name + ": " + msg
	return
}

func HandlerConnect(conn net.Conn)  {
	defer conn.Close()
	// 创建channel 判断,用户是否活跃。
	hasData := make(chan bool)

	// 获取用户 网络地址 IP+port
	netAddr := conn.RemoteAddr().String()
	// 创建新连接用户的 结构体. 默认用户是 IP+port
	clnt := Client{make(chan string), netAddr, netAddr}

	// 将新连接用户,添加到在线用户map中. key: IP+port value:client
	onlineMap[netAddr] = clnt

	// 创建专门用来给当前 用户发送消息的 go 程
	go WriteMsgToClient(clnt, conn)

	// 发送 用户上线消息到 全局channel 中
	//message <- "[" + netAddr + "]" + clnt.Name + "login"
	message <- MakeMsg(clnt, "login")

	// 创建一个 channel , 用来判断用退出状态
	isQuit := make(chan bool)

	// 创建一个匿名 go 程, 专门处理用户发送的消息。
	go func() {
		buf := make([]byte, 4096)
		for {
			n, err := conn.Read(buf)
			if n == 0 {
				isQuit <- true
				fmt.Printf("检测到客户端:%s退出\n", clnt.Name)
				return
			}
			if err != nil {
				fmt.Println("conn.Read err:", err)
				return
			}
			// 将读到的用户消息,保存到msg中,string 类型
			msg := string(buf[:n-1])

			// 提取在线用户列表
			if msg == "who" && len(msg) == 3 {
				conn.Write([]byte("online user list:\n"))
				// 遍历当前 map ,获取在线用户
				for _, user := range onlineMap {
					userInfo := user.Addr + ":" + user.Name + "\n"
					conn.Write([]byte(userInfo))
				}
				// 判断用户发送了 改名 命令
			} else if len(msg) >=8 && msg[:6] == "rename" {		// rename|
				newName := strings.Split(msg, "|")[1]		// msg[8:]
				clnt.Name = newName								// 修改结构体成员name
				onlineMap[netAddr] = clnt						// 更新 onlineMap
				conn.Write([]byte("rename successful\n"))
			}else {
				// 将读到的用户消息,写入到message中。
				message <- MakeMsg(clnt, msg)
			}
			hasData <- true
		}
	}()

	// 保证 不退出
	for {
		// 监听 channel 上的数据流动
		select {
		case <-isQuit:
			delete(onlineMap, clnt.Addr)		// 将用户从 online移除
			message <- MakeMsg(clnt, "logout")   // 写入用户退出消息到全局channel
			return
		case <-hasData:
			// 什么都不做。 目的是重置 下面 case 的计时器。
		case <-time.After(time.Second * 60):
			delete(onlineMap, clnt.Addr)       // 将用户从 online移除
			message <- MakeMsg(clnt, "time out leaved") // 写入用户退出消息到全局channel
			return
		}
	}
}

func Manager()  {
	// 初始化 onlineMap
	onlineMap = make(map[string]Client)

	// 监听全局channel 中是否有数据, 有数据存储至 msg, 无数据阻塞。
	for {
		msg := <-message

		// 循环发送消息给 所有在线用户。要想执行,必须 msg := <-message 执行完, 解除阻塞。
		for _, clnt := range onlineMap {
			clnt.C <- msg
		}
	}
}

func main()  {
	// 创建监听套接字
	listener, err := net.Listen("tcp", "127.0.0.1:8000")
	if err != nil {
		fmt.Println("Listen err", err)
		return
	}
	defer listener.Close()

	// 创建管理者go程,管理map 和全局channel
	go Manager()

	// 循环监听客户端连接请求
	for {
		conn, err := listener.Accept()
		if err != nil {
			fmt.Println("Accept err", err)
			return
		}
		// 启动go程处理客户端数据请求
		go HandlerConnect(conn)
	}
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK