此页面所有软件内容、截图、价格、介绍等均来源于互联网,地址均为第三方提供,请谨慎下载。



eRPC

eRPC 是一个高效、可扩展且简单易用的 RPC 框架。

适用于 RPC、微服务、点对点长连接、IM 和游戏等领域。

安装

  • go vesion ≥ 1.11

  • install

GO111MODULE=on go get -u -v -insecure github.com/andeya/erpc/v7
  • import
import "github.com/andeya/erpc/v7"

特性

  • 使用 peer 为 serverclient 提供相同的 API 封装
  • 提供多层抽象,如:
    • peer
    • session/socket
    • router
    • handle/context
    • message
    • protocol
    • codec
    • transfer filter
    • plugin
  • 支持平滑重启和关闭
  • 兼容 HTTP 的消息格式:
    • HeaderBody 两部分组成
    • Header 包含与 HTTP header 格式相同的 metadata
    • Body 支持类似 Content Type 的自定义编解码器,已经实现的:
      • Protobuf
      • Thrift
      • JSON
      • XML
      • Form
      • Plain
    • 支持 push、call-reply 和更多的消息类型
  • 支持自定义消息协议,并提供了一些常见实现:
    • rawproto - 默认的高性能二进制协议
    • jsonproto - JSON 消息协议
    • pbproto - Ptotobuf 消息协议
    • thriftproto - Thrift 消息协议
    • httproto - HTTP 消息协议
  • 可优化的高性能传输层
    • 使用 Non-block socket 和 I/O 多路复用技术
    • 支持设置套接字 I/O 的缓冲区大小
    • 支持设置读取消息的大小(如果超过则断开连接)
    • 支持控制连接的文件描述符
  • 支持多种网络类型:
    • tcp
    • tcp4
    • tcp6
    • unix
    • unixpacket
    • kcp
    • quic
    • 其他
      • websocket
      • evio
  • 提供丰富的插件埋点,并已实现:
    • auth
    • binder
    • heartbeat
    • ignorecase(service method)
    • overloader
    • proxy(for unknown service method)
    • secure
  • 强大灵活的日志系统:
    • 详细的日志信息,支持打印输入和输出详细信息
    • 支持设置慢操作警报阈值
    • 支持自定义实现日志组件
  • 客户端会话支持在断开连接后自动重拨

性能测试

自测

  • 一个服务端与一个客户端进程,在同一台机器上运行

  • CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz

  • Memory: 16G

  • OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4

  • Go: 1.9.2

  • 信息大小: 581 bytes

  • 信息编码:protobuf

  • 发送 1000000 条信息

  • erpc

  • erpc/socket

对比测试

More Detail

  • CPU耗时火焰图 erpc/socket

svg file

  • 堆栈信息火焰图 erpc/socket

svg file

代码示例

server.go

package main

import (
	"fmt"
	"time"

	"github.com/andeya/erpc/v7"
)

func main() {
	defer erpc.FlushLogger()
	// graceful
	go erpc.GraceSignal()

	// server peer
	srv := erpc.NewPeer(erpc.PeerConfig{
		CountTime:   true,
		ListenPort:  9090,
		PrintDetail: true,
	})
	// srv.SetTLSConfig(erpc.GenerateTLSConfigForServer())

	// router
	srv.RouteCall(new(Math))

	// broadcast per 5s
	go func() {
		for {
			time.Sleep(time.Second * 5)
			srv.RangeSession(func(sess erpc.Session) bool {
				sess.Push(
					"/push/status",
					fmt.Sprintf("this is a broadcast, server time: %v", time.Now()),
				)
				return true
			})
		}
	}()

	// listen and serve
	srv.ListenAndServe()
}

// Math handler
type Math struct {
	erpc.CallCtx
}

// Add handles addition request
func (m *Math) Add(arg *[]int) (int, *erpc.Status) {
	// test meta
	erpc.Infof("author: %s", m.PeekMeta("author"))
	// add
	var r int
	for _, a := range *arg {
		r  = a
	}
	// response
	return r, nil
}

client.go

package main

import (
	"time"

	"github.com/andeya/erpc/v7"
)

func main() {
	defer erpc.SetLoggerLevel("ERROR")()

	cli := erpc.NewPeer(erpc.PeerConfig{})
	defer cli.Close()
	// cli.SetTLSConfig(&tls.Config{InsecureSkipVerify: true})

	cli.RoutePush(new(Push))

	sess, stat := cli.Dial(":9090")
	if !stat.OK() {
		erpc.Fatalf("%v", stat)
	}

	var result int
	stat = sess.Call("/math/add",
		[]int{1, 2, 3, 4, 5},
		&result,
		erpc.WithAddMeta("author", "andeya"),
	).Status()
	if !stat.OK() {
		erpc.Fatalf("%v", stat)
	}
	erpc.Printf("result: %d", result)
	erpc.Printf("Wait 10 seconds to receive the push...")
  time.Sleep(time.Second * 10)
}

// Push push handler
type Push struct {
  erpc.PushCtx
}

// Push handles '/push/status' message
func (p *Push) Status(arg *string) *erpc.Status {
  erpc.Printf("%s", *arg)
  return nil
}

更多示例

用法

NOTE:

  • 最好设置读包时大小限制: SetReadLimit
  • 默认读包时大小限制为 1 GB

Peer端点(服务端或客户端)示例

// Start a server
var peer1 = erpc.NewPeer(erpc.PeerConfig{
ListenPort: 9090, // for server role
})
peer1.Listen()

...

// Start a client
var peer2 = erpc.NewPeer(erpc.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")

自带ServiceMethod映射规则

  • 结构体或方法名称到服务方法名称的默认映射(HTTPServiceMethodMapper):

    • AaBb -> /aa_bb
    • ABcXYz -> /abc_xyz
    • Aa__Bb -> /aa_bb
    • aa__bb -> /aa_bb
    • ABC__XYZ -> /abc_xyz
    • Aa_Bb -> /aa/bb
    • aa_bb -> /aa/bb<365>
    • ABC_XYZ -> /abc/xyz
    erpc.SetServiceMethodMapper(erpc.HTTPServiceMethodMapper)
  • 结构体或方法名称到服务方法名称的映射(RPCServiceMethodMapper):

    • AaBb -> AaBb
    • ABcXYz -> ABcXYz
    • Aa__Bb -> Aa_Bb
    • aa__bb -> aa_bb
    • ABC__XYZ -> ABC_XYZ
    • Aa_Bb -> Aa.Bb
    • aa_bb -> aa.bb
    • ABC_XYZ -> ABC.XYZ
    erpc.SetServiceMethodMapper(erpc.RPCServiceMethodMapper)

Call-Struct 接口模版

type Aaa struct {
    erpc.CallCtx
}
func (x *Aaa) XxZz(arg *<T>) (<T>, *erpc.Status) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the call route
// HTTP mapping: /aaa/xx_zz
// RPC mapping: Aaa.XxZz
peer.RouteCall(new(Aaa))

// or register the call route
// HTTP mapping: /xx_zz
// RPC mapping: XxZz
peer.RouteCallFunc((*Aaa).XxZz)

Call-Function 接口模板

func XxZz(ctx erpc.CallCtx, arg *<T>) (<T>, *erpc.Status) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the call route
// HTTP mapping: /xx_zz
// RPC mapping: XxZz
peer.RouteCallFunc(XxZz)

Push-Struct 接口模板

type Bbb struct {
    erpc.PushCtx
}
func (b *Bbb) YyZz(arg *<T>) *erpc.Status {
    ...
    return nil
}
  • 注册到根路由:
// register the push handler
// HTTP mapping: /bbb/yy_zz
// RPC mapping: Bbb.YyZz
peer.RoutePush(new(Bbb))

// or register the push handler
// HTTP mapping: /yy_zz
// RPC mapping: YyZz
peer.RoutePushFunc((*Bbb).YyZz)

Push-Function 接口模板

// YyZz register the handler
func YyZz(ctx erpc.PushCtx, arg *<T>) *erpc.Status {
    ...
    return nil
}
  • 注册到根路由:
// register the push handler
// HTTP mapping: /yy_zz
// RPC mapping: YyZz
peer.RoutePushFunc(YyZz)

Unknown-Call-Function 接口模板

func XxxUnknownCall (ctx erpc.UnknownCallCtx) (interface{}, *erpc.Status) {
    ...
    return r, nil
}
  • 注册到根路由:
// register the unknown pull route: /*
peer.SetUnknownCall(XxxUnknownCall)

Unknown-Push-Function 接口模板

func XxxUnknownPush(ctx erpc.UnknownPushCtx) *erpc.Status {
    ...
    return nil
}
  • 注册到根路由:
// register the unknown push route: /*
peer.SetUnknownPush(XxxUnknownPush)

插件示例

// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
    return &ignoreCase{}
}

type ignoreCase struct{}

var (
    _ erpc.PostReadCallHeaderPlugin = new(ignoreCase)
    _ erpc.PostReadPushHeaderPlugin = new(ignoreCase)
)

func (i *ignoreCase) Name() string {
    return "ignoreCase"
}

func (i *ignoreCase) PostReadCallHeader(ctx erpc.ReadCtx) *erpc.Status {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

func (i *ignoreCase) PostReadPushHeader(ctx erpc.ReadCtx) *erpc.Status {
    // Dynamic transformation path is lowercase
    ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
    return nil
}

注册以上操作和插件示例到路由

// add router group
group := peer.SubRoute("test")
// register to test group
group.RouteCall(new(Aaa), NewIgnoreCase())
peer.RouteCallFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownCall(XxxUnknownCall)
peer.SetUnknownPush(XxxUnknownPush)

配置信息

type PeerConfig struct {
    Network            string        `yaml:"network"              ini:"network"              comment:"Network; tcp, tcp4, tcp6, unix, unixpacket, kcp or quic"`
    LocalIP            string        `yaml:"local_ip"             ini:"local_ip"             comment:"Local IP"`
    ListenPort         uint16        `yaml:"listen_port"          ini:"listen_port"          comment:"Listen port; for server role"`
    DialTimeout time.Duration `yaml:"dial_timeout" ini:"dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
    RedialTimes        int32         `yaml:"redial_times"         ini:"redial_times"         comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; Unlimited when <0; for client role"`
	RedialInterval     time.Duration `yaml:"redial_interval"      ini:"redial_interval"      comment:"Interval of redialing each time, default 100ms; for client role; ns,µs,ms,s,m,h"`
    DefaultBodyCodec   string        `yaml:"default_body_codec"   ini:"default_body_codec"   comment:"Default body codec type id"`
    DefaultSessionAge  time.Duration `yaml:"default_session_age"  ini:"default_session_age"  comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    DefaultContextAge  time.Duration `yaml:"default_context_age"  ini:"default_context_age"  comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
    SlowCometDuration  time.Duration `yaml:"slow_comet_duration"  ini:"slow_comet_duration"  comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
    PrintDetail        bool          `yaml:"print_detail"         ini:"print_detail"         comment:"Is print body and metadata or not"`
    CountTime          bool          `yaml:"count_time"           ini:"count_time"           comment:"Is count cost time or not"`
}

通信优化

  • SetMessageSizeLimit 设置报文大小的上限, 如果 maxSize<=0,上限默认为最大 uint32

    func SetMessageSizeLimit(maxMessageSize uint32)
  • SetSocketKeepAlive 是否允许操作系统的发送TCP的keepalive探测包

    func SetSocketKeepAlive(keepalive bool)
  • SetSocketKeepAlivePeriod 设置操作系统的TCP发送keepalive探测包的频度

    func SetSocketKeepAlivePeriod(d time.Duration)
  • SetSocketNoDelay 是否禁用Nagle算法,禁用后将不在合并较小数据包进行批量发送,默认为禁用

    func SetSocketNoDelay(_noDelay bool)
  • SetSocketReadBuffer 设置操作系统的TCP读缓存区的大小

    func SetSocketReadBuffer(bytes int)
  • SetSocketWriteBuffer 设置操作系统的TCP写缓存区的大小

    func SetSocketWriteBuffer(bytes int)

扩展包

编解码器

插件

协议

传输过滤器

其他模块

基于eRPC的项目

企业用户

  
  

开源协议

eRPC 项目采用商业应用友好的 Apache2.0 协议发布

网友提问

温馨提示! 即将跳转到 第三方 网站下载具体内容

点赞(93) 打赏

微信小程序

微信扫一扫体验

立即
投稿

微信公众账号

微信扫一扫加关注

发表
评论
返回
顶部