14

conntrack通信原理分析

 4 years ago
source link: http://blog.spoock.com/2020/01/18/conntrack/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在Netfilter框架研究 这篇文章中说过,通过 cat /proc/net/nf_conntrack 或者是通过 conntrack -L -o extend 方式查看当前系统中的连接跟踪信息。如下所示:

ipv4 2 tcp 6 431916 ESTABLISHED src=172.22.44.167 dst=172.22.44.196 sport=44972 dport=18012 src=172.22.44.196 dst=172.22.44.167 sport=18012 dport=44972 [ASSURED] mark=0 zone=0 use=2

但是网络上大部分资料都是通过注册 NF_INET_LOCAL_IN 这样一个HOOK函数的方式来获取信息。既然在用户态可以直接获取conntrack连接跟踪信息,那么就不需要通过注册一个 NF_INET_LOCAL_IN 这样的函数来实现。

本文就是探究在用户态下如何实施获取到连接跟踪信息。

代码分析

conntrack-go 为例来看看conntrack的实现原理。基于 first comit 这个commit来进行分析。首先分析程序的入口文件, conntrack-agent.go

func main(){
	h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
	if err != nil {
		log.Fatalln("failed to create Handle..ERROR:",err)
	}
	err = h.ConntrackTableFlush(lib.ConntrackTable)
	if err != nil {
		log.Fatalln("failed to flush conntrack table..ERROR:", err)
	}
	for {
		flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
		if err == nil {
			if len(flows) != 0 {
				for _, flow := range flows {
					fmt.Println(flow)
				}
			}
		}
		<-time.After(time.Millisecond * 50)
	}
}

其实主要就是三个步骤.

h,err := lib.NewHandle(unix.NETLINK_NETFILTER)
h.ConntrackTableFlush(lib.ConntrackTable)
h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

NewHandle(unix.NETLINK_NETFILTER)

进入到 NewHandle(….) 函数内部

func NewHandle(nlFamilies ...int) (*Handle, error) {
	return newHandle(None(), None(), nlFamilies...)
}
func newHandle(newNs, curNs NsHandle, nlFamilies ...int) (*Handle, error) {
	h := &Handle{sockets: map[int]*SocketHandle{}}
	fams := SupportedNlFamilies
	if len(nlFamilies) != 0 {
		fams = nlFamilies
	}
	for _, f := range fams {
		s, err := GetNetlinkSocketAt(newNs, curNs, f)
		if err != nil {
			return nil, err
		}
		h.sockets[f] = &SocketHandle{Socket: s}
	}
	return h, nil
}

newNs和curNs,使用newNs替换curNs 即创建一个network namespace,但是这对我们来说没有必要,所以都是nil. 程序直接调用 GetNetlinkSocketAt

// GetNetlinkSocketAt opens a netlink socket in the network namespace newNs
// and positions the thread back into the network namespace specified by curNs,
// when done. If curNs is close, the function derives the current namespace and
// moves back into it when done. If newNs is close, the socket will be opened
// in the current network namespace.
func GetNetlinkSocketAt(newNs, curNs NsHandle, protocol int) (*NetlinkSocket, error) {
	c, err := executeInNetns(newNs, curNs)
	if err != nil {
		return nil, err
	}
	defer c()
	return getNetlinkSocket(protocol)
}

// 命名空间的设置不是我们关心的重点,我们主要是关心如何如何通过protocol创建一个socket对象.
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
	fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)
	if err != nil {
		return nil, err
	}
	s := &NetlinkSocket{
		fd: int32(fd),
	}
	s.lsa.Family = unix.AF_NETLINK
	if err := unix.Bind(fd, &s.lsa); err != nil {
		unix.Close(fd)
		return nil, err
	}

	return s, nil
}

getNetlinkSocket 就是常见的socket创建方法.

fd, err := unix.Socket(unix.AF_NETLINK, unix.SOCK_RAW|unix.SOCK_CLOEXEC, protocol)
unix.Bind(fd, &s.lsa)

最后我们再回到 NewHandle

// 得到netfilter类型的socket fd
s, err := GetNetlinkSocketAt(newNs, curNs, f)

// 创建SocketHandle对象,并将socket属性赋值为socket fd
// SocketHandle contains the netlink socket and the associated
// sequence counter for a specific netlink family
type SocketHandle struct {
	Seq    uint32
	Socket *NetlinkSocket
}

type NetlinkSocket struct {
	fd  int32
	lsa unix.SockaddrNetlink
	sync.Mutex
}
h.sockets[f] = &SocketHandle{Socket: s}

ConntrackTableFlush(lib.ConntrackTable)

首先分析出 lib.ConntrackTable 的值.

type ConntrackTableType uint8
const (
	ConntrackTable = ConntrackTableType(1)
	ConntrackExpectTable = ConntrackTableType(2)
)

接下来分析 ConntrackTableFlush 顾名思义. ConntrackTableFlush 就是清空连接跟踪表.

func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
	req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
	_, err := req.Execute(unix.NETLINK_NETFILTER, 0)
	return err
}
func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
	// Create the Netlink request object
	req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
	// Add the netfilter header
	msg := &Nfgenmsg{
		NfgenFamily: uint8(family),
		Version:     NFNETLINK_V0,
		ResId:       0,
	}
	req.AddData(msg)
	return req
}

req := h.newNetlinkRequest((int(table)<<8)|operation, flags) 是创建一个NetlinkRequest,至于此时各个参数的内容:

  1. table:1
  2. famliy: unix.AF_INET(值为2)
  3. operation:2
  4. flag: unix.NLM_F_ACK(值为4)

分析程序 newNetlinkRequest

type NetlinkRequest struct {
	unix.NlMsghdr
	Data    []NetlinkRequestData
	RawData []byte
	Sockets map[int]*SocketHandle
}

func (h *Handle) newNetlinkRequest(proto, flags int) *NetlinkRequest {
	// Do this so that package API still use nl package variable nextSeqNr
	if h.sockets == nil {
		return NewNetlinkRequest(proto, flags)
	}
	return &NetlinkRequest{
	    // 一个标准的netlink message的头,
		// 参见:https://blog.spoock.com/2019/11/25/lkm/#%E7%A4%BA%E4%BE%8B%E7%A8%8B%E5%BA%8F
		/*
          nlh = (struct nlmsghdr *) malloc(NLMSG_SPACE(MAX_PAYLOAD));
          memset(nlh, 0, NLMSG_SPACE(MAX_PAYLOAD));
          nlh->nlmsg_len = NLMSG_SPACE(MAX_PAYLOAD);
          nlh->nlmsg_pid = getpid();
          nlh->nlmsg_flags = 0;
        */	
		NlMsghdr: unix.NlMsghdr{
			Len:   uint32(unix.SizeofNlMsghdr),
			Type:  uint16(proto),
			Flags: unix.NLM_F_REQUEST | uint16(flags),
		},
		Sockets: h.sockets,
	}
}

按照 NETLINK(7) 中的说明,

nlmsg_type can be one of the standard message types: NLMSG_NOOP message is to be ignored, NLMSG_ERROR message signals an error and the payload contains an nlmsgerr structure, NLMSG_DONE message terminates a multipart message.\

struct nlmsgerr { \

int error; /

Negative errno or 0 for acknowledgements / \

/\

};

对于nlmsg_type来说,存在四种类型.每种类型对应的Int值分别是:

// referer:https://elixir.bootlin.com/linux/v4.7/source/include/uapi/linux/netlink.h#L95
#define NLMSG_NOOP		0x1	/* Nothing.		*/
#define NLMSG_ERROR		0x2	/* Error		*/
#define NLMSG_DONE		0x3	/* End of a dump	*/
#define NLMSG_OVERRUN	0x4	/* Data lost		*/

同时在 include/uapi/linux/netfilter/nfnetlink_conntrack.h 还定义了一些消息控制类型.如下所示:

enum cntl_msg_types {
	IPCTNL_MSG_CT_NEW,
	IPCTNL_MSG_CT_GET,
	IPCTNL_MSG_CT_DELETE,
	IPCTNL_MSG_CT_GET_CTRZERO,
	IPCTNL_MSG_CT_GET_STATS_CPU,
	IPCTNL_MSG_CT_GET_STATS,
	IPCTNL_MSG_CT_GET_DYING,
	IPCTNL_MSG_CT_GET_UNCONFIRMED,

	IPCTNL_MSG_MAX
};

本例中的 IPCTNL_MSG_CT_DELETE 值是2,对应于 cntl_msg_types 中的 IPCTNL_MSG_CT_DELETE
对于nlmsg_flags,则是:

NLM_F_REQUEST Must be set on all request messages. NLM_F_MULTI The message is part of a multipart mes‐ sage terminated by NLMSG_DONE. NLM_F_ACK Request for an acknowledgment on success. NLM_F_ECHO Echo this request. Additional flag bits for GET requests

对应的值是:

/* Flags values */

#define NLM_F_REQUEST		1	/* It is request message. 	*/
#define NLM_F_MULTI		2	/* Multipart message, terminated by NLMSG_DONE */
#define NLM_F_ACK		4	/* Reply with ack, with zero or error code */
#define NLM_F_ECHO		8	/* Echo this request 		*/
#define NLM_F_DUMP_INTR		16	/* Dump was inconsistent due to sequence change */

所以在本例中选择是1和4,即 NLM_F_REQUESTNLM_F_ACK 表示是一条请求信息,并且需要回复.

最后 newNetlinkRequest 成功执行,返回一个 NetlinkRequest 类型的机构体. 内容如下:

&NetlinkRequest{
		NlMsghdr: unix.NlMsghdr{
			Len:   uint32(unix.SizeofNlMsghdr),
			Type:  uint16(proto),
			Flags: unix.NLM_F_REQUEST | uint16(flags),
		},
		// 通过前面GetNetlinkSocketAt()函数得到的socket fd
		Sockets: h.sockets,
	}

回到 newConntrackRequest .

func (h *Handle) newConntrackRequest(table ConntrackTableType, family InetFamily, operation, flags int) *NetlinkRequest {
	// Create the Netlink request object
	req := h.newNetlinkRequest((int(table)<<8)|operation, flags)
	// Add the netfilter header
	msg := &Nfgenmsg{
		NfgenFamily: uint8(family),
		Version:     NFNETLINK_V0,
		ResId:       0,
	}
	req.AddData(msg)
	return req
}

func (req *NetlinkRequest) AddData(data NetlinkRequestData) {
	req.Data = append(req.Data, data)
}

type NetlinkRequest struct {
	unix.NlMsghdr
	Data    []NetlinkRequestData
	RawData []byte
	Sockets map[int]*SocketHandle
}
type NetlinkRequestData interface {
	Len() int
	Serialize() []byte
}

获得了 NetlinkRequest 结构体之后,调用 AddData() 方法,将数据填充到Data属性中.Data属性是一个 NetlinkRequestData 的接口.

当执行完毕 newConntrackRequest 之后,程序回到主函数 ConntrackTableFlush

func (h *Handle) ConntrackTableFlush(table ConntrackTableType) error {
	req := h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK)
	_, err := req.Execute(unix.NETLINK_NETFILTER, 0)
	return err
}

调用 NetlinkRequestExecute() 方法.

// Execute the request against a the given sockType.
// Returns a list of netlink messages in serialized format, optionally filtered
// by resType.
func (req *NetlinkRequest) Execute(sockType int, resType uint16) ([][]byte, error) {
	var (
		s   *NetlinkSocket
		err error
	)

	if req.Sockets != nil {
	    // 获取socket 对象
		if sh, ok := req.Sockets[sockType]; ok {
			s = sh.Socket
			// 设置序列号为1
			req.Seq = atomic.AddUint32(&sh.Seq, 1)
		}
	}
	sharedSocket := s != nil

	if s == nil {
		s, err = getNetlinkSocket(sockType)
		if err != nil {
			return nil, err
		}
		defer s.Close()
	} else {
		s.Lock()
		defer s.Unlock()
	}
	if err := s.Send(req); err != nil {
		return nil, err
	}

	pid, err := s.GetPid()
	if err != nil {
		return nil, err
	}

	var res [][]byte

done:
	for {
		msgs, err := s.Receive()
		if err != nil {
			return nil, err
		}
		for _, m := range msgs {
			if m.Header.Seq != req.Seq {
				if sharedSocket {
					continue
				}
				return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
			}
			if m.Header.Pid != pid {
				return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
			}
			if m.Header.Type == unix.NLMSG_DONE {
				break done
			}
			if m.Header.Type == unix.NLMSG_ERROR {
				native := NativeEndian()
				error := int32(native.Uint32(m.Data[0:4]))
				if error == 0 {
					break done
				}
				return nil, syscall.Errno(-error)
			}
			if resType != 0 && m.Header.Type != resType {
				continue
			}
			res = append(res, m.Data)
			if m.Header.Flags&unix.NLM_F_MULTI == 0 {
				break done
			}
		}
	}
	return res, nil
}

其中的关键代码是 s.Send(req) . S是NetlinkSocket,req是包含了请求数据的对象.

type NetlinkSocket struct {
	fd  int32
	lsa unix.SockaddrNetlink
	sync.Mutex
}

type NetlinkRequest struct {
	unix.NlMsghdr
	Data    []NetlinkRequestData
	RawData []byte
	Sockets map[int]*SocketHandle
}

func (s *NetlinkSocket) Send(request *NetlinkRequest) error {
	fd := int(atomic.LoadInt32(&s.fd))
	if fd < 0 {
		return fmt.Errorf("Send called on a closed socket")
	}
	if err := unix.Sendto(fd, request.Serialize(), 0, &s.lsa); err != nil {
		return err
	}
	return nil
}

此时的 NetlinkSocket 是:

iYNf63v.png!web

NetlinkRequest 是:

byeEbqr.png!web

NetlinkSocket 的send()方法本质上还是调用unix的send()方法, unix.Sendto(fd, request.Serialize(), 0, &s.lsa) 只是需要对request的数据进行序列化,就会调用 NetlinkRequestSerialize 方法.

// Serialize the Netlink Request into a byte array
func (req *NetlinkRequest) Serialize() []byte {
	length := unix.SizeofNlMsghdr
	dataBytes := make([][]byte, len(req.Data))
	for i, data := range req.Data {
	    // 其中的req.Data是一个Nfgenmsg类型的结构体
		dataBytes[i] = data.Serialize()
		length = length + len(dataBytes[i])
	}
	length += len(req.RawData)

	req.Len = uint32(length)
	b := make([]byte, length)
	hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
	next := unix.SizeofNlMsghdr
	copy(b[0:next], hdr)
	for _, data := range dataBytes {
		for _, dataByte := range data {
			b[next] = dataByte
			next = next + 1
		}
	}
	// Add the raw data if any
	if len(req.RawData) > 0 {
		copy(b[next:length], req.RawData)
	}
	return b
}

所以 data.Serialize() 本质上就是调用 Nfgenmsg的Serialize() 方法.

type Nfgenmsg struct {
	NfgenFamily uint8
	Version     uint8
	ResId       uint16 // big endian
}

func (msg *Nfgenmsg) Len() int {
	return SizeofNfgenmsg
}

func DeserializeNfgenmsg(b []byte) *Nfgenmsg {
	return (*Nfgenmsg)(unsafe.Pointer(&b[0:SizeofNfgenmsg][0]))
}

//将Nfgenmsg转换为一个byte数组.
func (msg *Nfgenmsg) Serialize() []byte {
	return (*(*[SizeofNfgenmsg]byte)(unsafe.Pointer(msg)))[:]
}

回到主函数 Serialize

b := make([]byte, length)
hdr := (*(*[unix.SizeofNlMsghdr]byte)(unsafe.Pointer(req)))[:]
next := unix.SizeofNlMsghdr
copy(b[0:next], hdr)
for _, data := range dataBytes {
	for _, dataByte := range data {
		b[next] = dataByte
		next = next + 1
	}
}

将req转换为长度为SizeofNlMsghdr的byte.最后将req和req.Data全部填充到b中 (b := make([]byte, length)) , 最终调用 unix.Sendto(fd, request.Serialize(), 0, &s.lsa); 发送数据.

程序继续执行,回到函数Execute函数. 调用 Sendto() 发送数据之后,接下来就是接受数据.

done:
	for {
	    // 获取数据
		msgs, err := s.Receive()
		if err != nil {
			return nil, err
		}
		for _, m := range msgs {
		    // 检测序列号,判断两者是否一致
			if m.Header.Seq != req.Seq {
				if sharedSocket {
					continue
				}
				return nil, fmt.Errorf("Wrong Seq nr %d, expected %d", m.Header.Seq, req.Seq)
			}
			// 检测pid是否一直
			if m.Header.Pid != pid {
				return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
			}
			// 通过Netlink Message的头类型判断数据是什么类型
			if m.Header.Type == unix.NLMSG_DONE {  // 数据发送完毕
				break done
			}
			if m.Header.Type == unix.NLMSG_ERROR {   // 错误信息
				native := NativeEndian()
				error := int32(native.Uint32(m.Data[0:4]))
				if error == 0 {
					break done
				}
				return nil, syscall.Errno(-error)
			}
			if resType != 0 && m.Header.Type != resType {
				continue
			}
			// 添加数据
			res = append(res, m.Data)
			// 如果flags和NLM_F_MULTI并级是0,则同样结束遍历.
			if m.Header.Flags&unix.NLM_F_MULTI == 0 {
				break done
			}
		}
	}
	return res, nil

unix.NLMSG_ERROR 中,如果确定前面error信息的前面4个字节是0,则同样表示请求结束.

if m.Header.Type == unix.NLMSG_ERROR {   // 错误信息
    native := NativeEndian()
    error := int32(native.Uint32(m.Data[0:4]))
    if error == 0 {
        break done
    }
    return nil, syscall.Errno(-error)
}

我们观察此时我们的请求书数据.

所以,虽然是NLMSG_ERROR类型的相应包,但是只要error的前面4个字节是0,则表示没有错误,成功执行.

至此,我们的 h.ConntrackTableFlush(lib.ConntrackTable) 就执行完毕, 本质上就是发送IPCTNL_MSG_CT_DELETE请求,清空连接跟踪表.

ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))

当执行完毕 ConntrackTableFlush(lib.ConntrackTable) 之后 , 程序就会执行 ConntrackTableFlush(lib.ConntrackTable) ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET)) 获取连接跟踪的数据.

func (h *Handle) ConntrackTableList(table ConntrackTableType, family InetFamily) ([]*ConntrackFlow, error) {
	res, err := h.dumpConntrackTable(table, family)
	if err != nil {
		return nil, err
	}

	// Deserialize all the flows
	var result []*ConntrackFlow
	for _, dataRaw := range res {
		result = append(result, parseRawData(dataRaw))
	}

	return result, nil
}

func (h *Handle) dumpConntrackTable(table ConntrackTableType, family InetFamily) ([][]byte, error) {
	req := h.newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP)
	return req.Execute(unix.NETLINK_NETFILTER, 0)
}

可以看到,其实ConntrackTableList和前面分析的ConntrackTableFlush 整个流程基本相同. 不同之处在于:

  1. ConntrackTableList 是调用 newConntrackRequest(table, family, IPCTNL_MSG_CT_GET, unix.NLM_F_DUMP) 用于获取信息,而在 ConntrackTableFlush 则是调用 h.newConntrackRequest(table, unix.AF_INET, IPCTNL_MSG_CT_DELETE, unix.NLM_F_ACK) 清除连接跟踪表.
  2. ConntrackTableListres, err := h.dumpConntrackTable(table, family) ,需要得到请求之后的返回值,即连接跟踪的数据.在 ConntrackTableFlush ,则是 _, err := req.Execute(unix.NETLINK_NETFILTER, 0) ,丢弃了返回值.因为连接跟踪表仅仅只是关心请求清空的操作是否成功执行,并不关心返回数据,其实也没有数据返回.

接下里就是主要分析 ConntrackTableList 对返回数据的解析部分

parseRawData(dataRaw)

func parseRawData(data []byte) *ConntrackFlow {
	s := &ConntrackFlow{}
	var proto uint8
	// First there is the Nfgenmsg header
	// consume only the family field
	reader := bytes.NewReader(data)
	binary.Read(reader, NativeEndian(), &s.FamilyType)

	// skip rest of the Netfilter header
	reader.Seek(3, seekCurrent)
	// The message structure is the following:
	// <len, NLA_F_NESTED|CTA_TUPLE_ORIG> 4 bytes
	// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
	// flow information of the forward flow
	// <len, NLA_F_NESTED|CTA_TUPLE_REPLY> 4 bytes
	// <len, NLA_F_NESTED|CTA_TUPLE_IP> 4 bytes
	// flow information of the reverse flow
	for reader.Len() > 0 {
		if nested, t, l := parseNfAttrTL(reader); nested {
			switch t {
			case CTA_TUPLE_ORIG:
				if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
					proto = parseIpTuple(reader, &s.Forward)
				}
			case CTA_TUPLE_REPLY:
				if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
					parseIpTuple(reader, &s.Reverse)
				} else {
					// Header not recognized skip it
					reader.Seek(int64(l), seekCurrent)
				}
			case CTA_COUNTERS_ORIG:
				s.Forward.Bytes, s.Forward.Packets = parseByteAndPacketCounters(reader)
			case CTA_COUNTERS_REPLY:
				s.Reverse.Bytes, s.Reverse.Packets = parseByteAndPacketCounters(reader)
			}
		}
	}
	if proto == TCP_PROTO {
		reader.Seek(64, seekCurrent)
		_, t, _, v := parseNfAttrTLV(reader)
		if t == CTA_MARK {
			s.Mark = uint32(v[3])
		}
	} else if proto == UDP_PROTO {
		reader.Seek(16, seekCurrent)
		_, t, _, v := parseNfAttrTLV(reader)
		if t == CTA_MARK {
			s.Mark = uint32(v[3])
		}
	}
	return s
}

skip header

reader := bytes.NewReader(data)
binary.Read(reader, NativeEndian(), &s.FamilyType)

// skip rest of the Netfilter header
reader.Seek(3, seekCurrent)

将数据变为Reader对象之后,跳过前面4个字符.注释解释为跳过Netfilter header. 因为在Netlink Data中的前面4个字节一般都是代表nfgenmsg信息

/* General form of address family dependent message.
 */
struct nfgenmsg {
	__u8  nfgen_family;		/* AF_xxx */  // 在本例中是获取IPv4的连接跟踪信息,所以就是NFPROTO_IPV4,即2
	__u8  version;		/* nfnetlink version */ // 一般情况是NFNETLINK_V0,即0
	__be16    res_id;		/* resource id */  //一般情况是0
};

enum {
	NFPROTO_UNSPEC =  0,
	NFPROTO_INET   =  1,
	NFPROTO_IPV4   =  2,
	NFPROTO_ARP    =  3,
	NFPROTO_NETDEV =  5,
	NFPROTO_BRIDGE =  7,
	NFPROTO_IPV6   = 10,
	NFPROTO_DECNET = 12,
	NFPROTO_NUMPROTO,
};

parseNfAttrTL(reader)

程序解析reader,获取数据.

func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
	binary.Read(r, NativeEndian(), &len)
	len -= SizeofNfattr

	binary.Read(r, NativeEndian(), &attrType)
	isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
	attrType = attrType & (NLA_F_NESTED - 1)

	return isNested, attrType, len
}

此时,解析得到的数据如下:

rmUnq2n.png!web

attrType的类型定义如下:

enum ctattr_type {
	CTA_UNSPEC,
	CTA_TUPLE_ORIG,
	CTA_TUPLE_REPLY,
	CTA_STATUS,
	CTA_PROTOINFO,
	CTA_HELP,
	CTA_NAT_SRC,
#define CTA_NAT	CTA_NAT_SRC	/* backwards compatibility */
	CTA_TIMEOUT,
	CTA_MARK,
	CTA_COUNTERS_ORIG,
	CTA_COUNTERS_REPLY,
	CTA_USE,
	CTA_ID,
	CTA_NAT_DST,
	CTA_TUPLE_MASTER,
	CTA_SEQ_ADJ_ORIG,
	CTA_NAT_SEQ_ADJ_ORIG	= CTA_SEQ_ADJ_ORIG,
	CTA_SEQ_ADJ_REPLY,
	CTA_NAT_SEQ_ADJ_REPLY	= CTA_SEQ_ADJ_REPLY,
	CTA_SECMARK,		/* obsolete */
	CTA_ZONE,
	CTA_SECCTX,
	CTA_TIMESTAMP,
	CTA_MARK_MASK,
	CTA_LABELS,
	CTA_LABELS_MASK,
	__CTA_MAX
};

当前值为1,则对应于 CTA_TUPLE_ORIG 类型. 对应的处理代码如下:

case CTA_TUPLE_ORIG:
	if nested, t, _ = parseNfAttrTL(reader); nested && t == CTA_TUPLE_IP {
		proto = parseIpTuple(reader, &s.Forward)
	}

又经过一次 parseNfAttrTL(reader) 解析,此时返回值如下:

3iYjEjM.png!web

满足条件之后,程序进入到 parseIpTuple(reader, &s.Forward) 继续解析数据.

// This method parse the ip tuple structure
// The message structure is the following:
// <len, [CTA_IP_V4_SRC|CTA_IP_V6_SRC], 16 bytes for the IP>
// <len, [CTA_IP_V4_DST|CTA_IP_V6_DST], 16 bytes for the IP>
// <len, NLA_F_NESTED|nl.CTA_TUPLE_PROTO, 1 byte for the protocol, 3 bytes of padding>
// <len, CTA_PROTO_SRC_PORT, 2 bytes for the source port, 2 bytes of padding>
// <len, CTA_PROTO_DST_PORT, 2 bytes for the source port, 2 bytes of padding>
func parseIpTuple(reader *bytes.Reader, tpl *ipTuple) uint8 {
	for i := 0; i < 2; i++ {
		_, t, _, v := parseNfAttrTLV(reader)
	  /*
		referer:include/uapi/linux/netfilter.h
        enum ctattr_ip {
			CTA_IP_UNSPEC,
			CTA_IP_V4_SRC,
			CTA_IP_V4_DST,
			CTA_IP_V6_SRC,
			CTA_IP_V6_DST,
			__CTA_IP_MAX
      };
      */
        //解析源地址和目标地址
		switch t {
		case CTA_IP_V4_SRC, CTA_IP_V6_SRC:
			tpl.SrcIP = v
		case CTA_IP_V4_DST, CTA_IP_V6_DST:
			tpl.DstIP = v
		}
	}
	// Skip the next 4 bytes  nl.NLA_F_NESTED|nl.CTA_TUPLE_PROTO
	reader.Seek(4, seekCurrent)
	_, t, _, v := parseNfAttrTLV(reader)
	// 解析消息类型
	/*
	referer: include/uapi/linux/netfilter/nfnetlink_conntrack.h
	enum ctattr_l4proto {
	CTA_PROTO_UNSPEC,
	CTA_PROTO_NUM,
	CTA_PROTO_SRC_PORT,
	CTA_PROTO_DST_PORT,
	CTA_PROTO_ICMP_ID,
	CTA_PROTO_ICMP_TYPE,
	CTA_PROTO_ICMP_CODE,
	CTA_PROTO_ICMPV6_ID,
	CTA_PROTO_ICMPV6_TYPE,
	CTA_PROTO_ICMPV6_CODE,
	__CTA_PROTO_MAX
};
	*/
	if t == CTA_PROTO_NUM {
	    // 解析得到对应的protocol的编号,在本里是6,即TCP
		tpl.Protocol = uint8(v[0])
	}
	// Skip some padding 3 bytes
	reader.Seek(3, seekCurrent)
	for i := 0; i < 2; i++ {
	    // 同样的方法解析得到源端口和目的端口
		_, t, _ := parseNfAttrTL(reader)
		switch t {
		case CTA_PROTO_SRC_PORT:
			parseBERaw16(reader, &tpl.SrcPort)
		case CTA_PROTO_DST_PORT:
			parseBERaw16(reader, &tpl.DstPort)
		}
		// Skip some padding 2 byte
		reader.Seek(2, seekCurrent)
	}
	return tpl.Protocol
}

// 获取对应的属性和值
func parseNfAttrTLV(r *bytes.Reader) (isNested bool, attrType, len uint16, value []byte) {
	isNested, attrType, len = parseNfAttrTL(r)

	value = make([]byte, len)
	binary.Read(r, binary.BigEndian, &value)
	return isNested, attrType, len, value
}
func parseNfAttrTL(r *bytes.Reader) (isNested bool, attrType, len uint16) {
 	binary.Read(r, NativeEndian(), &len)
	len -= SizeofNfattr

	binary.Read(r, NativeEndian(), &attrType)
	isNested = (attrType & NLA_F_NESTED) == NLA_F_NESTED
	attrType = attrType & (NLA_F_NESTED - 1)

	return isNested, attrType, len
}

按照上面的这种方式将数据解析完毕之后,就需要将信息返回.

type ipTuple struct {
	Bytes    uint64
	DstIP    net.IP
	DstPort  uint16
	Packets  uint64
	Protocol uint8
	SrcIP    net.IP
	SrcPort  uint16
}

type ConntrackFlow struct {
	FamilyType uint8
	Forward    ipTuple
	Reverse    ipTuple
	Mark       uint32
}
s := &ConntrackFlow{}
if proto == TCP_PROTO {
	reader.Seek(64, seekCurrent)
	_, t, _, v := parseNfAttrTLV(reader)
	if t == CTA_MARK {
		s.Mark = uint32(v[3])
	}
} else if proto == UDP_PROTO {
	reader.Seek(16, seekCurrent)
	_, t, _, v := parseNfAttrTLV(reader)
	if t == CTA_MARK {
		s.Mark = uint32(v[3])
	}
}

解析得到MARK值,填充到 ConntrackFlow 结构体中.

output

解析得到数据之后,接下来就是输出结果.

for {
	flows, err := h.ConntrackTableList(lib.ConntrackTable, lib.InetFamily(unix.AF_INET))
	if err == nil {
		if len(flows) != 0 {
			for _, flow := range flows {
				fmt.Println(flow)
			}
		}
	}

最终输出结果.内容如下:

tcp	6 src=IP1 dst=IP2 sport=33508 dport=17250 packets=175 bytes=23397	src=IP2 dst=IP1 sport=17250 dport=33508 packets=214 bytes=28663 mark=0
udp	17 src=IP3 dst=IP4 sport=5353 dport=5353 packets=5 bytes=469	src=IP4 dst=IP3 sport=5353 dport=5353 packets=0 bytes=0 mark=0

总结

花了大量的时间来分析整个获取连接跟踪信息的过程,收获非常的大.但是由于精力和时间的关系,对于最后的解析netfilter的返回回来的数据没有详解,等有时间在详细说明吧.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK