35

以太坊控制台源码分析

 5 years ago
source link: https://blog.csdn.net/TurkeyCock/article/details/89295196?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

最近有网友提到以太坊控制台的代码看不太明白,抽了点时间整理了一下。

当我们通过 geth console 或者 geth attach 与节点交互的时候,输入的命令是如何被处理的呢?看下面这张流程图就明白了:

VRNvYvv.png!web
  • 命令行编辑器Liner等待用户输入命令
  • JSRE使用一个名为scheduler的通道(chan)接收命令
  • JSRE把命令发送给Javascript解释器Otto处理
  • Otto中预加载了web3.js,执行对应的函数并通过provider发送RPC请求
  • Web3 provider被设置为一个Bridge模块,接收请求并转发给RCP Client
  • RPC Client通过全双工管道和RPC Server通信,完成RPC调用
  • 将RPC调用结果输出到命令行

可以看到,流程还是很清晰的,但是涉及到很多模块。实际上,这些模块都被包含在Console的数据结构之中:

type Console struct {
	client   *rpc.Client
	jsre     *jsre.JSRE
	prompt   string
	prompter UserPrompter
	histPath string
	history  []string
	printer  io.Writer
}

下面会对这些模块一一进行介绍。

1. Liner:带历史记录的命令行编辑器

既然是控制台,那么显然需要一个命令行编辑器来输入命令并打印结果。以太坊使用的是一个开源的命令编辑器Liner,github地址: https://github.com/peterh/liner

这个命令编辑器还是挺强大的,除了基本的交互以外,还支持历史记录和自动补全。我们来看一个最简单的使用示例:

// 创建liner实例
line := liner.NewLiner()
defer line.Close()

// 设置自动补全处理函数
line.SetCompleter(func(line string) {
  ...
})

// 打印提示,接收用户输入
name, err := line.Prompt("What is your name? ")
if err == nil {
	log.Print("Got: ", name)
	// 添加历史记录
	line.AppendHistory(name)
}

当然,为了可扩展性,以太坊在外面做了一层封装。默认情况下会创建一个terminalPrompter,内部其实还是直接调用liner,具体可以参见console/prompter.go。

另外,思考一个问题: 当我们在控制台输入 eth.getT 然后按Tab键时,会自动帮我们补全为 eth.getTransaction ,这是怎么做到的?

实际上,可以通过调用Javascript的getOwnPropertyNames()函数获取对象的所有属性和方法,然后选出匹配项加入自动补全列表中。具体代码实现参见internal/jsre/completion.go以及internal/jsre/pretty.go。

2. Otto:JavaScript解释器

为了方便理解,我们先介绍一下Otto,稍后再介绍JSRE。

Otto是一个Go语言实现的JavaScript解释器,并且可以很方便地实现Javascript和Go之间的相互调用。我们来看一下具体用法,非常简单:

  • 创建otto实例:
import (
    "github.com/robertkrimen/otto"
)

vm := otto.New()
  • 设置一个Javascript变量的值:
vm.Set("a", 88)
vm.Set("b", "hello")
  • 获取一个Javascript变量值:
value, err := vm.Get("a")
{
	value, _ := value.ToInteger()
}
  • 执行一段Javascript代码:
vm.Run(`
	console.log(b + a); // hello88
`)
  • 执行一个Javascript表达式并获取返回值:
value, _ := vm.Run("b.length")
{
	value, _ := value.ToInteger()
}
  • 执行一个Javascript函数并获取返回值:
value, _ := vm.Call(`[ 1, 2, 3 ].concat`, nil, 4, 5, 6, "abc")
{
	value, _ := value.Export() // [ 1, 2, 3, 4, 5, 6, "abc" ]
}
  • 设置一个Go函数(可以在Javascript中调用):
vm.Set("twoPlus", func(call otto.FunctionCall) otto.Value {
    right, _ := call.Argument(0).ToInteger()
    result, _ := vm.ToValue(2 + right)
    return result
})
  • 在Javascript中调用Go函数:
result, _ = vm.Run(`
    result = twoPlus(2.0); // 4
`)
{
	result, _ := result.ToInteger()
}
  • 编译执行.js文件:
code, _ := ioutil.ReadFile("./test.js")
script, _ := vm.Compile("test.js", code)
vm.Run(script)

但是,Otto没有提供Web开发中经常使用到的setTimeout()和setInterval()等函数,它的文档里提到这是因为这些函数不是ECMA-262标准的一部分,并且需要增加事件循环。如果你想使用这些函数,需要自己实现。实际上,以太坊中使用time.AfterFunc()实现了这些函数,并通过vm.Set()设置到了Javascript中,具体可以参见internal/jsre/jsre.go。

3. JSRE:实现事件循环

所谓事件循环,其实就是一个消息队列,在Go中一般是通过通道(chan)来实现。

命令行接收到用户输入的命令后,会调用JSRE的Evaluate()函数,我们来看看该函数的具体实现:

func (re *JSRE) Evaluate(code string, w io.Writer) error {
	var fail error

	re.Do(func(vm *otto.Otto) {
		val, err := vm.Run(code)
		if err != nil {
			prettyError(vm, err, w)
		} else {
			prettyPrint(vm, val, w)
		}
		fmt.Fprintln(w)
	})
	return fail
}

可以发现,会调用Do()方法把该命令送入事件循环。同时还需要传入一个回调函数,当事件循环执行到该命令时,会调用该函数。在回调函数中,通过Otto的Run()函数执行该命令,然后把执行结果打印到命令行中。

我们再来看一下Do()的具体实现:

func (re *JSRE) Do(fn func(*otto.Otto)) {
	done := make(chan bool)
	req := &evalReq{fn, done}
	re.evalQueue <- req
	<-done
}

代码很简单,先往evalQueue通道中送入一个请求,然后等待被调度执行。

接下来我们就来看看事件循环的实现,也就是JSRE中最为核心的runEventLoop()函数:

func (re *JSRE) runEventLoop() {
	vm := otto.New()
	...
	vm.Set("_setTimeout", setTimeout)
	vm.Set("_setInterval", setInterval)
	...
	for {
		select {
		case timer := <-ready:
			...
			_, err := vm.Call(`Function.call.call`, nil, arguments...)
			...
		case req := <-re.evalQueue:
			req.fn(vm)
			close(req.done)
			...
		case waitForCallbacks = <-re.stopEventLoop:
			...
		}
	}
	...
}

首先创建Otto实例,然后把setTimeout()/setInterval()这些函数设置进去。上一节我们提到过,Otto默认没有提供这些函数,需要自己实现。接着就是一个for-select循环了,主要就是监听3个通道:

  • timer:处理延时请求,时间到了以后通过Otto的Call()函数执行命令
  • evalQueue:处理非延时请求,调用回调函数立即执行
  • stopEventLoop:退出事件循环

4. web3.js和bridge

web3.js是一个Javascript库,提供了一些方便的API供前端开发使用,代码位于internal/jsre/deps/web3.js。

需要注意的是,如果你想修改web3.js,直接修改该文件的内容是不生效的,需要先通过go-bindata生成一个bindata.go文件,然后再编译以太坊。具体来说需要使用下面两行命令:

go-bindata -nometadata -pkg deps -o bindata.go bignumber.js web3.js
gofmt -w -s bindata.go

创建Web3对象时需要提供一个provider,通过provider的send()或者sendAsync()函数可以发起RPC请求。在控制台应用场景下,我们不需要真正发起HTTP请求,只需要在进程内(InProc)通信就可以了。因此,JSRE中设置了一个名为jeth的provider,同时把它的send()和sendAsync()函数绑定到一个bridge对象的Send()函数上。

那么,web3.js是怎么被加载进JSRE中的呢?又是如何跟bridge对象完成绑定的呢?实际上,这是在Console模块的init()函数中完成的,参见console/console.go(省略部分不相关代码):

func (c *Console) init(preload []string) error {
	// 创建bridge对象
	bridge := newBridge(c.client, c.prompter, c.printer)
	// 创建jeth对象
	c.jsre.Set("jeth", struct{}{})
	jethObj, _ := c.jsre.Get("jeth")
	// 绑定send()/sendAsync()到bridge.Send()
	jethObj.Object().Set("send", bridge.Send)
	jethObj.Object().Set("sendAsync", bridge.Send)

	// 替换console的打印函数
	consoleObj, _ := c.jsre.Get("console")
	consoleObj.Object().Set("log", c.consoleOutput)
	consoleObj.Object().Set("error", c.consoleOutput)
	
	// 加载bignumber.js
	c.jsre.Compile("bignumber.js", jsre.BigNumber_JS)
	// 加载web3.js
	c.jsre.Compile("web3.js", jsre.Web3_JS)
	c.jsre.Run("var Web3 = require('web3');")
	// 创建Web3对象,设置jeth为provider
	c.jsre.Run("var web3 = new Web3(jeth);")
	...
	// 创建我们熟悉的eth和personal对象
	flatten := "var eth = web3.eth; var personal = web3.personal; "
	...
	c.jsre.Run(flatten)
	...
}

可以看到,这里会编译加载bignumber.js和web3.js,创建Web3对象,设置jeth为provider,同时把send()/sendAsync()绑定到bridge的Send()函数上。另外,还会创建我们熟悉的eth和personal对象,并替换掉console对象的log()和error()函数(输出到命令行中)。

接下来,我们就来看看bridge对象是如何发起RPC请求的,代码位于console/bridge.go:

func (b *bridge) Send(call otto.FunctionCall) (response otto.Value) {
	// 获取Javascript请求参数
	JSON, _ := call.Otto.Object("JSON")
	reqVal, err := JSON.Call("stringify", call.Argument(0))
	...
	// 生成Go中的请求对象
	dec = json.NewDecoder(strings.NewReader(rawReq))
	reqs = make([]jsonrpcCall, 1)
	dec.Decode(&reqs[0])
	...
	// 通过RPC Client发起RPC请求
	var result json.RawMessage
	err = b.client.Call(&result, req.Method, req.Params...)
	...
	// 解析执行结果
	resultVal, err := JSON.Call("parse", string(result))
	...
	// 返回执行结果
	response, _ = resps.Get("0")
	return response
}

可以发现,主要就是通过调用RPC Client的Call()函数完成RPC请求,然后解析并返回执行结果。

另外,上面的reqs是一个数组,实际上是可以支持批量发送请求的,不过这个不是重点,在此略过。

5. RPC Client

RPC Client是真正发起RPC调用的模块,对端的RPC Server会处理请求并返回执行结果。

我们来看一看RPC Client的创建过程,代码位于rpc/inproc.go中:

func DialInProc(handler *Server) *Client {
	initctx := context.Background()
	c, _ := newClient(initctx, func(context.Context) (net.Conn, error) {
		p1, p2 := net.Pipe()
		go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
		return p2, nil
	})
	return c
}

可以看出,关键之处在于创建了一对全双工管道p1和p2。然后启动了一个线程作为RPC Server,通过管道通信,服务端使用p1,客户端使用p2。

Go语言中的net库提供了全双工管道的支持,具体来说,每对管道中包含10个通道(chan),参见下面的示意图:

7f6jQrf.png!web

大概解释一下:Rx表示接收数据,Tx表示发送数据。

当我们需要发起请求时,往wrTx中写入请求数据,然后从wrRx中读取执行结果。

当我们需要处理请求是,从rdRx中读取请求数据,处理完毕后,把执行结果写入rdTx。

如果需要关闭本地管道,则向done通道中写入数据,同时也可以查询对端的管道是否关闭。

6. RPC Server

RPC Server是真正处理RPC请求的模块,内部通过ServerCodec对象完成具体的处理工作。

ServerCodec是一个接口,由于需要处理JSON RPC,上一节我们通过NewJSONCodec()创建了它的一个实例,代码位于rpc/json.go。

不知道大家有没有过这样一个疑问: 我们发起JSON RPC的时候指定的函数名是 eth_sendTransaction ,但是以太坊源码中好像搜不到这个函数啊? 那么是怎么找到对应的处理函数的呢?

实际上,RPC Server在读取请求参数的时候偷偷做了处理,把 eth_sendTransaction 一分为二, eth 作为namespace, sendTransaction 作为method,具体代码参见rpc/server.go和rpc/json.go:

func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error) {
	reqs, batch, err := codec.ReadRequestHeaders()
	...
}

func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, Error) {
	...
	return parseRequest(incomingMsg)
}

func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
	...
	// 把请求的Method一分为二
	elems := strings.Split(in.Method, serviceMethodSeparator)
	...
	if len(in.Payload) == 0 {
		return []rpcRequest{{service: elems[0], method: elems[1], id: ∈.Id}}, false, nil
	}

	return []rpcRequest{{service: elems[0], method: elems[1], id: ∈.Id, params: in.Payload}}, false, nil
}

到这里,读过我之前写的以太坊RPC源码分析的朋友应该都明白了,接下来就是根据namespace和method调用对应的API就可以了。以 eth_sendTransaction 为例,对应的配置位于internal/ethapi/backend.go:

{
	Namespace: "eth",
	Version:   "1.0",
	Service:   NewPublicTransactionPoolAPI(apiBackend, nonceLock),
	Public:    true,
}

对应的API函数位于internal/ethapi/api.go:

func (s *PublicTransactionPoolAPI) SendTransaction(ctx context.Context, args SendTxArgs) (common.Hash, error) {
	...
}

这里还有一个疑问: eth_sendTransaction 中的函数名的首字母是小写的s,这里的API函数的首字母是大写的S,这是怎么匹配上的呢?

实际上,在注册系统API的时候完成了这项映射工作,具体参见rpc/server.go:

func (s *Server) RegisterName(name string, rcvr interface{}) error {
	...
	methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
	...
}

func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
	...
	for m := 0; m < typ.NumMethod(); m++ {
		...
		mname := formatName(method.Name)
	...
}

// formatName will convert to first character to lower case
func formatName(name string) string {
	ret := []rune(name)
	if len(ret) > 0 {
		ret[0] = unicode.ToLower(ret[0])
	}
	return string(ret)
}

7. 把所有知识串联到一起

看到这里,相信大家应该对控制台的整个流程有了一个非常清晰的把握。本文之所以没有一上来就分析入口代码,然后一路向下,主要是担心大家会湮没在代码的细节中,无法在更高的维度上看清各个模块之间的关联。

当然,出于完整性考虑,我们也在这里分析一下入口代码,方便大家把所有知识串联到一起。

当我们在运行 geth console 命令时,会执行cmd/geth/main.go中的consoleCommand:

func init() {
	...
	consoleCommand
	...
}

该命令对应的处理函数是cmd/geth/consolecmd.go的localConsole():

func localConsole(ctx *cli.Context) error {
	node := makeFullNode(ctx)
	startNode(ctx, node)
	defer node.Stop()

	client, err := node.Attach()
	...

	console, err := console.New(config)
	defer console.Stop(false)

	...
	console.Welcome()
	console.Interactive()

	return nil
}

主要做了下面4件事情:

  • 启动一个新节点并attach上去
  • 创建console实例
  • 打印欢迎信息
  • 进入交互模式

首先看一下Attach()函数,代码位于node/node.go:

func (n *Node) Attach() (*rpc.Client, error) {
	...
	return rpc.DialInProc(n.inprocHandler), nil
}

这个函数之前分析过,会创建一个RPC Client。

第二步就是创建Console实例,在第一节我们看过Console的数据结构,其中包含了RPC Client、JSRE、命令行编辑器、history等实例。

第三步打印欢迎信息,这个没啥说的。

最后一步执行console.Interactive(),等待和处理用户输入。我们来看一下这个函数:

func (c *Console) Interactive() {
	...
	go func() {
		for {
			// 接收用户输入
      line, err := c.prompter.PromptInput(<-scheduler)
      ...
      // 把命令送入scheduler通道
      scheduler <- line
		}
	}
	...
	for {
    ...
    select {
    // 从scheduler通道取出命令
    case line, ok := <-scheduler:
    	...
    	// 送入JSRE执行
    	c.Evaluate(input)
	}
	...
}

首先会启动一个新线程,通过Liner获取用户输入。当用户输入一条命令后,将命令送入scheduler通道。

在当前线程中,通过for-select不断从scheduler通道中取出命令,然后送入JSRE执行。

至此,以太坊控制台的整个流程就全部打通了,如果你再回头看一眼开头的那张框架图,相信一定会有不一样的感觉。如有疑问,欢迎给我留言。

更多文章欢迎关注“鑫鑫点灯”专栏: https://blog.csdn.net/turkeycock

或关注飞久微信公众号: uIfYruR.jpg!web

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK