12

goim 中的 watchOps 在监听什么?

 4 years ago
source link: https://studygolang.com/articles/26758
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

问题在这里 github.com/Terry-Mao/g…

MFNJn22.png!web

提到的询问是:

对于watchOps的理解,理解成房间应该是有偏差的。

1、watchOps = watchOperate的切片,意思是监听操作的意思,而不是watchRooms。

2、里面的NeedPush方法,来判断是否拥有这个Operate,才能推送。

3、Operate刚好和协议里面的Op都是int32,也是操作id的意思。

4、roomid是string类型。

综上所述,应该是当登录成功,每个ch都拥有一组Operate。当广播消息的时候,判断是否监听这个操作,有监听,就推送下去。

当然完全当做房间id来用,也是完全能实现监听其他房间消息的作用,只是从我上面提的1、4这2点来看,不像房间id的意思。

提到我, 而有此答. 以此答来再看一些源代码的前后关联阅读. 以下为答复.

这么老的讨论依然活跃啊............

watchOps 这里是不是房间呢? 有可能不是...............

看代码:

operation 的定义, 来自这里

syntax = "proto3";

package goim.comet;
option go_package = "grpc";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";

/*
 * v1.0.0
 * protocol
 */
message Proto {
    int32 ver = 1 [(gogoproto.jsontag) = "ver"];
    int32 op = 2 [(gogoproto.jsontag) = "op"];
    int32 seq = 3 [(gogoproto.jsontag) = "seq"];
    bytes body = 4 [(gogoproto.jsontag) = "body"];
}
复制代码

具体的 operation 值定义在这里 /api/comet/grpc/operation.go

package grpc

const (
	// OpHandshake handshake
	OpHandshake = int32(0)
	// OpHandshakeReply handshake reply
	OpHandshakeReply = int32(1)

	// OpHeartbeat heartbeat
	OpHeartbeat = int32(2)
	// OpHeartbeatReply heartbeat reply
	OpHeartbeatReply = int32(3)

	// OpSendMsg send message.
	OpSendMsg = int32(4)
	// OpSendMsgReply  send message reply
	OpSendMsgReply = int32(5)

	// OpDisconnectReply disconnect reply
	OpDisconnectReply = int32(6)

	// OpAuth auth connnect
	OpAuth = int32(7)
	// OpAuthReply auth connect reply
	OpAuthReply = int32(8)

	// OpRaw raw message
	OpRaw = int32(9)

	// OpProtoReady proto ready
	OpProtoReady = int32(10)
	// OpProtoFinish proto finish
	OpProtoFinish = int32(11)

	// OpChangeRoom change room
	OpChangeRoom = int32(12)
	// OpChangeRoomReply change room reply
	OpChangeRoomReply = int32(13)

	// OpSub subscribe operation
	OpSub = int32(14)
	// OpSubReply subscribe operation
	OpSubReply = int32(15)

	// OpUnsub unsubscribe operation
	OpUnsub = int32(16)
	// OpUnsubReply unsubscribe operation reply
	OpUnsubReply = int32(17)
)

复制代码

注意这个 OpSub = int32(14)

在 channel.go 里 , 这里有个方法定义, op 被使用为 watchOps 的键, 而值是一个空的 struct

// Watch watch a operation.
func (c *Channel) Watch(accepts ...int32) {
	c.mutex.Lock()
	for _, op := range accepts {
		c.watchOps[op] = struct{}{}
	}
	c.mutex.Unlock()
}
复制代码

这个 Watch 方法, 在 comet/operation.go 里被引用 这个方法 func (s *Server) Operate........ 用在 comet 服务的 server_tcp.go / server_websocket.go 中能看到详细用法( 不另加说明了, 代码很清楚表明了使用场景)

// Operate operate.
func (s *Server) Operate(ctx context.Context, p *model.Proto, ch *Channel, b *Bucket) error {
	switch p.Op {
	case model.OpChangeRoom:
		if err := b.ChangeRoom(string(p.Body), ch); err != nil {
			log.Errorf("b.ChangeRoom(%s) error(%v)", p.Body, err)
		}
		p.Op = model.OpChangeRoomReply
	case model.OpSub:
		if ops, err := strings.SplitInt32s(string(p.Body), ","); err == nil {
			ch.Watch(ops...)
		}
		p.Op = model.OpSubReply
	case model.OpUnsub:
		if ops, err := strings.SplitInt32s(string(p.Body), ","); err == nil {
			ch.UnWatch(ops...)
		}
		p.Op = model.OpUnsubReply
	default:
		// TODO ack ok&failed
		if err := s.Receive(ctx, ch.Mid, p); err != nil {
			log.Errorf("s.Report(%d) op:%d error(%v)", ch.Mid, p.Op, err)
		}
		p.Body = nil
	}
	return nil
}

复制代码

请注意这句

case model.OpSub:
		if ops, err := strings.SplitInt32s(string(p.Body), ","); err == nil {
			ch.Watch(ops...)
		}
复制代码

明显可见, ops 是从 p.Body 里, 通过 strings.SplitInt32s 对 "xxxx,xxxx,xxxx" 以 "," 拆分并转化为 int32 得到的.

那么, 什么内容以 "xxxx,xxxx,xxxx" 的形式存在 p.Body 里呢???

这个最终的小答案, 就由大家来寻找吧.

至少, 说明了 watchOps 这个 int32 并不是 /api/comet/grpc/operation.go 里定义的 operation

以上, 祝安康,愉快.

2020/02/20 tsingson 于 深圳.南山.小罗号口琴音乐中心

FZbuQ3y.png!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK