16

B站微服务框架Kratos详细教程(7)- 数据库

 3 years ago
source link: https://blog.csdn.net/uisoul/article/details/111267349
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

B站微服务框架Kratos详细教程(7)- 数据库

由于kratos使用了wire依赖注入框架,开始使用前,建议先了解相关教程:依赖注入wire使用详解

打开配置文件configs/mysql.toml,修改为自己的服务器配置:

[Client]
	addr = "127.0.0.1:3306"
	dsn = "{user}:{password}@tcp(127.0.0.1:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"
	readDSN = ["{user}:{password}@tcp(127.0.0.2:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"]
	active = 20
	idle = 10
	idleTimeout ="4h"
	queryTimeout = "200ms"
	execTimeout = "300ms"
	tranTimeout = "400ms"

在该配置文件中我们可以配置mysql的读和写的dsn、连接地址addr、连接池的闲置连接数idle、最大连接数active以及各类超时。

如果配置了readDSN,在进行读操作的时候会优先使用readDSN的连接,readDSN可以只配一个地址。

打开文件internal/dao/db.go

package dao

import (
	"context"

	"dbserver/internal/model"
	"github.com/go-kratos/kratos/pkg/conf/paladin"
	"github.com/go-kratos/kratos/pkg/database/sql"
)

func NewDB() (db *sql.DB, cf func(), err error) {
	var (
		cfg sql.Config
		ct paladin.TOML
	)
	//读取配置文件
	if err = paladin.Get("db.toml").Unmarshal(&ct); err != nil {
		return
	}
	if err = ct.Get("Client").UnmarshalTOML(&cfg); err != nil {
		return
	}
	//使用NewMySQL方法进行连接池对象的初始化
	db = sql.NewMySQL(&cfg)
	cf = func() {db.Close()}
	return
}

func (d *dao) RawArticle(ctx context.Context, id int64) (art *model.Article, err error) {
	// get data from db
	return
}

打开文件internal/dao/dao.go
在该文件中的New方法接收外部数据库连接池对象db *sql.DB, 也可以像官方文档说的, 直接在dao中初始化
这里涉及到依赖注入, 具体可以查看di/wire_gen.go文件
依赖注入相关可查看这篇文章: 依赖注入wire使用详解

package dao

import (
	"context"
	"time"

	"dbserver/internal/model"
	"github.com/go-kratos/kratos/pkg/cache/memcache"
	"github.com/go-kratos/kratos/pkg/cache/redis"
	"github.com/go-kratos/kratos/pkg/conf/paladin"
	"github.com/go-kratos/kratos/pkg/database/sql"
	"github.com/go-kratos/kratos/pkg/sync/pipeline/fanout"
	xtime "github.com/go-kratos/kratos/pkg/time"

	"github.com/google/wire"
)
//声明依赖注入对象
var Provider = wire.NewSet(New, NewDB, NewRedis, NewMC)

//go:generate kratos tool genbts
// Dao dao interface
type Dao interface {
	Close()
	Ping(ctx context.Context) (err error)
	// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
	Article(c context.Context, id int64) (*model.Article, error)
}

// dao dao.
type dao struct {
	db          *sql.DB
	redis       *redis.Redis
	mc          *memcache.Memcache
	cache *fanout.Fanout
	demoExpire int32
}

//使用参数接收连接池对象
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, cf func(), err error) {
	return newDao(r, mc, db)
}

//根据参数初始化dao
func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) {
	var cfg struct{
		DemoExpire xtime.Duration
	}
	if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
		return
	}
	d = &dao{
		db: db, //官方文档直接在这里初始化
		redis: r,
		mc: mc,
		cache: fanout.New("cache"),
		demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
	}
	cf = d.Close
	return
}

// Close close the resource.
func (d *dao) Close() {
	d.cache.Close()
}

// Ping ping the resource.
func (d *dao) Ping(ctx context.Context) (err error) {
	return nil
}

创建测试数据库

create database kratos_demo;
use kratos_demo;

CREATE TABLE `users` (
  `uid` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `nickname` varchar(100) NOT NULL DEFAULT '' COMMENT '昵称',
  `age` smallint(5) unsigned NOT NULL COMMENT '年龄',
  `uptime` int(10) unsigned NOT NULL DEFAULT '0',
  `addtime` int(10) unsigned NOT NULL DEFAULT '0',
  PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

model/model.go文件中添加结构体

type User struct {
	Uid int32
	Nickname string
	Age int32
	Uptime int32
	Addtime int32
}

dao/dao.go中新增四个接口

type Dao interface {
	Close()
	Ping(ctx context.Context) (err error)
	// bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
	Article(c context.Context, id int64) (*model.Article, error)
	//新增接口
	AddUser(c context.Context, nickname string, age int32) (user *model.User, err error)
	UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error)
	GetUser(c context.Context, uid int64) (user *model.User, err error)
	GetUserList(c context.Context) (userlist []*model.User, err error)
}

新增文件dao/dao.user.go, 实现四个接口

package dao

import (
	"context"
	"dbserver/internal/model"
	"fmt"
	"github.com/go-kratos/kratos/pkg/database/sql"
	"github.com/go-kratos/kratos/pkg/log"
	"time"
)
//添加用户
func (d *dao)AddUser(c context.Context, nickname string, age int32) (user *model.User, err error){
	querySql := fmt.Sprintf("INSERT INTO `users`(uid,nickname,age,uptime,addtime) VALUES(null,?,?,?,?);")

	timenow := time.Now().Unix()
	res, err := d.db.Exec(c, querySql, nickname, age, timenow, timenow)
	if err != nil {
		log.Error("db.Exec(%s) error(%v)", querySql, err)
		return nil, err
	}
	user = new(model.User)
	user.Uid, _ = res.LastInsertId()
	user.Nickname = nickname
	user.Age = age
	user.Addtime = int32(timenow)
	user.Uptime = int32(timenow)

	return user, nil
}
//更新用户信息
func (d *dao)UpdateUser(c context.Context, uid int64, nickname string, age int32) (row int64, err error){
	querySql := fmt.Sprintf("UPDATE `users` SET nickname=?,age=?,uptime=? WHERE uid=?;")

	timenow := time.Now().Unix()
	res, err := d.db.Exec(c, querySql, nickname, age, timenow, uid)
	if err != nil {
		log.Error("db.Exec(%s) error(%v)", querySql, err)
		return 0, err
	}

	row, err = res.RowsAffected()
	return row, nil
}
//查询用户
func (d *dao)GetUser(c context.Context, uid int64) (user *model.User, err error){
	querySql := fmt.Sprintf("SELECT * FROM `users` WHERE uid=?;")

	user = new(model.User)
	err = d.db.QueryRow(c, querySql, uid).Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime)
	if err != nil && err != sql.ErrNoRows {
		log.Error("d.QueryRow error(%v)", err)
		return
	}
	return user, nil
}
//查询用户列表
func (d *dao)GetUserList(c context.Context) (userlist []*model.User, err error){
	querySql := fmt.Sprintf("SELECT * FROM `users`;")
	rows, err := d.db.Query(c, querySql)
	if err != nil {
		log.Error("query  error(%v)", err)
		return
	}
	defer rows.Close()

	userlist = make([]*model.User,0)
	for rows.Next() {
		user := new(model.User)

		if err = rows.Scan(&user.Uid, &user.Nickname, &user.Age, &user.Uptime, &user.Addtime); err != nil {
			log.Error("scan demo log error(%v)", err)
			return
		}
		userlist = append(userlist, user)
	}
	return userlist, nil
}

打开api/api.proto, 增加测试http接口:

syntax = "proto3";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/api/annotations.proto";

// package 命名使用 {appid}.{version} 的方式, version 形如 v1, v2 ..
package demo.service.v1;

// NOTE: 最后请删除这些无用的注释 (゜-゜)つロ

option go_package = "api";
option (gogoproto.goproto_getters_all) = false;

service Demo {
  rpc Ping(.google.protobuf.Empty) returns (.google.protobuf.Empty);
  rpc SayHello(HelloReq) returns (.google.protobuf.Empty);
  rpc SayHelloURL(HelloReq) returns (HelloResp) {
    option (google.api.http) = {
      get: "/say_hello"
    };
  };
  rpc AddUser(AddReq) returns (Response) {
    option (google.api.http) = {
      get: "/adduser"
    };
  };
  rpc UpdateUser(UpdateReq) returns (Response) {
    option (google.api.http) = {
      get: "/updateuser"
    };
  };
  rpc GetUser(GetReq) returns (Response) {
    option (google.api.http) = {
      get: "/getuser"
    };
  };
  rpc GetUserList(.google.protobuf.Empty) returns (Response) {
    option (google.api.http) = {
      get: "/getuserlist"
    };
  };
}

message HelloReq {
  string name = 1 [(gogoproto.moretags) = 'form:"name" validate:"required"'];
}

message HelloResp {
  string Content = 1 [(gogoproto.jsontag) = 'content'];
}

message AddReq {
  string nickname = 1 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
  int32 age = 2 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}

message UpdateReq {
  int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
  string nickname = 2 [(gogoproto.moretags) = 'form:"nickname" validate:"required"'];
  int32 age = 3 [(gogoproto.moretags) = 'form:"age" validate:"required"'];
}

message GetReq {
  int64 uid = 1 [(gogoproto.moretags) = 'form:"uid" validate:"required"'];
}

message Response {
  string Content = 1 [(gogoproto.jsontag) = 'content'];
}

打开internal/service/service.go, 增加接口实现:


//添加用户
func (s *Service) AddUser(ctx context.Context, req *pb.AddReq) (reply *pb.Response, err error) {
	fmt.Printf("AddUser: %s, %d", req.Nickname, req.Age)
	user, err := s.dao.AddUser(ctx, req.Nickname, req.Age)
	if err != nil {
		fmt.Printf("AddUser %s, %d Error", req.Nickname, req.Age)
		return
	}
	res, _ := json.Marshal(user)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

//更新用户信息
func (s *Service) UpdateUser(ctx context.Context, req *pb.UpdateReq) (reply *pb.Response, err error) {
	fmt.Printf("UpdateUser: %s, %d", req.Nickname, req.Age)
	rows, err := s.dao.UpdateUser(ctx, req.Uid, req.Nickname, req.Age)
	if err != nil {
		fmt.Printf("UpdateUser %s, %d Error", req.Nickname, req.Age)
		return
	}
	reply = &pb.Response{
		Content: fmt.Sprintf("更新行数: %d",rows),
	}
	return
}

//获取用户信息
func (s *Service) GetUser(ctx context.Context, req *pb.GetReq) (reply *pb.Response, err error) {
	fmt.Printf("GetUser: %d", req.Uid)
	user, err := s.dao.GetUser(ctx, req.Uid)
	if err != nil {
		fmt.Printf("GetUser %s Error", req.Uid)
		return
	}
	res, _ := json.Marshal(user)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

//获取用户列表
func (s *Service) GetUserList(ctx context.Context, req *empty.Empty) (reply *pb.Response, err error) {
	fmt.Printf("GetUserList")
	userlist, err := s.dao.GetUserList(ctx)
	if err != nil {
		fmt.Printf("GetUserList Error")
		return
	}
	res, _ := json.Marshal(userlist)
	reply = &pb.Response{
		Content: string(res),
	}
	return
}

进入api目录, 重新生成pb文件

kratos tool protoc

运行项目:

kratos run

打开浏览器:

添加用户:

http://localhost:8000/adduser?nickname=soul&age=22

这里为了方便输出, 我直接返回字符串, 实际项目中应该要在pb文件中定义对应的结构体返回给客户端

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
    }
}

更新用户信息:

http://localhost:8000/updateuser?uid=3&nickname=soul&age=22

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "更新行数: 1"
    }
}

获取单个用户信息:

http://localhost:8000/getuser?uid=3

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}"
    }
}

获取用户列表:

http://localhost:8000/getuserlist

返回信息:

{
    "code": 0,
    "message": "0",
    "ttl": 1,
    "data": {
        "content": "[{\"Uid\":1,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102449,\"Addtime\":1608102449},{\"Uid\":2,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102514,\"Addtime\":1608102514},{\"Uid\":3,\"Nickname\":\"soul\",\"Age\":22,\"Uptime\":1608102563,\"Addtime\":1608102563}]"
    }
}

到此, 已实现基本的数据库操作

kratos/pkg/database/sql包支持事务操作,具体操作示例如下:

开启一个事务:

tx := d.db.Begin()
if err = tx.Error; err != nil {
    log.Error("db begin transcation failed, err=%+v", err)
    return
}

在事务中执行语句:

res, err := tx.Exec(_demoSQL, did)
if err != nil {
    return
}
rows := res.RowsAffected()

提交事务:

if err = tx.Commit().Error; err!=nil{
    log.Error("db commit transcation failed, err=%+v", err)
}

回滚事务:

if err = tx.Rollback().Error; err!=nil{
    log.Error("db rollback failed, err=%+v", rollbackErr)
}

事务相关就不写例子啦, 各位可自行尝试.

本项目示例源码: https://download.csdn.net/download/uisoul/13704134


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK