eRPC
eRPC 是一个高效、可扩展且简单易用的 RPC 框架。
适用于 RPC、微服务、点对点长连接、IM 和游戏等领域。
安装
-
go vesion ≥ 1.11
-
install
GO111MODULE=on go get -u -v -insecure github.com/andeya/erpc/v7
- import
import "github.com/andeya/erpc/v7"
特性
- 使用 peer 为 server 和 client 提供相同的 API 封装
- 提供多层抽象,如:
- peer
- session/socket
- router
- handle/context
- message
- protocol
- codec
- transfer filter
- plugin
- 支持平滑重启和关闭
- 兼容 HTTP 的消息格式:
- 由
Header
和Body
两部分组成 -
Header
包含与 HTTP header 格式相同的 metadata -
Body
支持类似 Content Type 的自定义编解码器,已经实现的:- Protobuf
- Thrift
- JSON
- XML
- Form
- Plain
- 支持 push、call-reply 和更多的消息类型
- 由
- 支持自定义消息协议,并提供了一些常见实现:
-
rawproto
- 默认的高性能二进制协议 -
jsonproto
- JSON 消息协议 -
pbproto
- Ptotobuf 消息协议 -
thriftproto
- Thrift 消息协议 -
httproto
- HTTP 消息协议
-
- 可优化的高性能传输层
- 使用 Non-block socket 和 I/O 多路复用技术
- 支持设置套接字 I/O 的缓冲区大小
- 支持设置读取消息的大小(如果超过则断开连接)
- 支持控制连接的文件描述符
- 支持多种网络类型:
tcp
tcp4
tcp6
unix
unixpacket
kcp
quic
- 其他
- websocket
- evio
- 提供丰富的插件埋点,并已实现:
- 强大灵活的日志系统:
- 详细的日志信息,支持打印输入和输出详细信息
- 支持设置慢操作警报阈值
- 支持自定义实现日志组件
- 客户端会话支持在断开连接后自动重拨
性能测试
自测
-
一个服务端与一个客户端进程,在同一台机器上运行
-
CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
-
Memory: 16G
-
OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
-
Go: 1.9.2
-
信息大小: 581 bytes
-
信息编码:protobuf
-
发送 1000000 条信息
-
erpc
- erpc/socket
对比测试
More Detail
- CPU耗时火焰图 erpc/socket
svg file
- 堆栈信息火焰图 erpc/socket
svg file
代码示例
server.go
package main import ( "fmt" "time" "github.com/andeya/erpc/v7" ) func main() { defer erpc.FlushLogger() // graceful go erpc.GraceSignal() // server peer srv := erpc.NewPeer(erpc.PeerConfig{ CountTime: true, ListenPort: 9090, PrintDetail: true, }) // srv.SetTLSConfig(erpc.GenerateTLSConfigForServer()) // router srv.RouteCall(new(Math)) // broadcast per 5s go func() { for { time.Sleep(time.Second * 5) srv.RangeSession(func(sess erpc.Session) bool { sess.Push( "/push/status", fmt.Sprintf("this is a broadcast, server time: %v", time.Now()), ) return true }) } }() // listen and serve srv.ListenAndServe() } // Math handler type Math struct { erpc.CallCtx } // Add handles addition request func (m *Math) Add(arg *[]int) (int, *erpc.Status) { // test meta erpc.Infof("author: %s", m.PeekMeta("author")) // add var r int for _, a := range *arg { r = a } // response return r, nil }
client.go
package main import ( "time" "github.com/andeya/erpc/v7" ) func main() { defer erpc.SetLoggerLevel("ERROR")() cli := erpc.NewPeer(erpc.PeerConfig{}) defer cli.Close() // cli.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) cli.RoutePush(new(Push)) sess, stat := cli.Dial(":9090") if !stat.OK() { erpc.Fatalf("%v", stat) } var result int stat = sess.Call("/math/add", []int{1, 2, 3, 4, 5}, &result, erpc.WithAddMeta("author", "andeya"), ).Status() if !stat.OK() { erpc.Fatalf("%v", stat) } erpc.Printf("result: %d", result) erpc.Printf("Wait 10 seconds to receive the push...") time.Sleep(time.Second * 10) } // Push push handler type Push struct { erpc.PushCtx } // Push handles '/push/status' message func (p *Push) Status(arg *string) *erpc.Status { erpc.Printf("%s", *arg) return nil }
更多示例
用法
NOTE:
- 最好设置读包时大小限制:
SetReadLimit
- 默认读包时大小限制为 1 GB
Peer端点(服务端或客户端)示例
// Start a server var peer1 = erpc.NewPeer(erpc.PeerConfig{ ListenPort: 9090, // for server role }) peer1.Listen() ... // Start a client var peer2 = erpc.NewPeer(erpc.PeerConfig{}) var sess, err = peer2.Dial("127.0.0.1:8080")
自带ServiceMethod映射规则
-
结构体或方法名称到服务方法名称的默认映射(HTTPServiceMethodMapper):
-
AaBb
->/aa_bb
-
ABcXYz
->/abc_xyz
-
Aa__Bb
->/aa_bb
-
aa__bb
->/aa_bb
-
ABC__XYZ
->/abc_xyz
-
Aa_Bb
->/aa/bb
-
aa_bb
->/aa/bb<365>
-
ABC_XYZ
->/abc/xyz
erpc.SetServiceMethodMapper(erpc.HTTPServiceMethodMapper)
-
-
结构体或方法名称到服务方法名称的映射(RPCServiceMethodMapper):
-
AaBb
->AaBb
-
ABcXYz
->ABcXYz
-
Aa__Bb
->Aa_Bb
-
aa__bb
->aa_bb
-
ABC__XYZ
->ABC_XYZ
-
Aa_Bb
->Aa.Bb
-
aa_bb
->aa.bb
-
ABC_XYZ
->ABC.XYZ
erpc.SetServiceMethodMapper(erpc.RPCServiceMethodMapper)
-
Call-Struct 接口模版
type Aaa struct { erpc.CallCtx } func (x *Aaa) XxZz(arg *<T>) (<T>, *erpc.Status) { ... return r, nil }
- 注册到根路由:
// register the call route // HTTP mapping: /aaa/xx_zz // RPC mapping: Aaa.XxZz peer.RouteCall(new(Aaa)) // or register the call route // HTTP mapping: /xx_zz // RPC mapping: XxZz peer.RouteCallFunc((*Aaa).XxZz)
Call-Function 接口模板
func XxZz(ctx erpc.CallCtx, arg *<T>) (<T>, *erpc.Status) { ... return r, nil }
- 注册到根路由:
// register the call route // HTTP mapping: /xx_zz // RPC mapping: XxZz peer.RouteCallFunc(XxZz)
Push-Struct 接口模板
type Bbb struct { erpc.PushCtx } func (b *Bbb) YyZz(arg *<T>) *erpc.Status { ... return nil }
- 注册到根路由:
// register the push handler // HTTP mapping: /bbb/yy_zz // RPC mapping: Bbb.YyZz peer.RoutePush(new(Bbb)) // or register the push handler // HTTP mapping: /yy_zz // RPC mapping: YyZz peer.RoutePushFunc((*Bbb).YyZz)
Push-Function 接口模板
// YyZz register the handler func YyZz(ctx erpc.PushCtx, arg *<T>) *erpc.Status { ... return nil }
- 注册到根路由:
// register the push handler // HTTP mapping: /yy_zz // RPC mapping: YyZz peer.RoutePushFunc(YyZz)
Unknown-Call-Function 接口模板
func XxxUnknownCall (ctx erpc.UnknownCallCtx) (interface{}, *erpc.Status) { ... return r, nil }
- 注册到根路由:
// register the unknown pull route: /* peer.SetUnknownCall(XxxUnknownCall)
Unknown-Push-Function 接口模板
func XxxUnknownPush(ctx erpc.UnknownPushCtx) *erpc.Status { ... return nil }
- 注册到根路由:
// register the unknown push route: /* peer.SetUnknownPush(XxxUnknownPush)
插件示例
// NewIgnoreCase Returns a ignoreCase plugin. func NewIgnoreCase() *ignoreCase { return &ignoreCase{} } type ignoreCase struct{} var ( _ erpc.PostReadCallHeaderPlugin = new(ignoreCase) _ erpc.PostReadPushHeaderPlugin = new(ignoreCase) ) func (i *ignoreCase) Name() string { return "ignoreCase" } func (i *ignoreCase) PostReadCallHeader(ctx erpc.ReadCtx) *erpc.Status { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil } func (i *ignoreCase) PostReadPushHeader(ctx erpc.ReadCtx) *erpc.Status { // Dynamic transformation path is lowercase ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path) return nil }
注册以上操作和插件示例到路由
// add router group group := peer.SubRoute("test") // register to test group group.RouteCall(new(Aaa), NewIgnoreCase()) peer.RouteCallFunc(XxZz, NewIgnoreCase()) group.RoutePush(new(Bbb)) peer.RoutePushFunc(YyZz) peer.SetUnknownCall(XxxUnknownCall) peer.SetUnknownPush(XxxUnknownPush)
配置信息
type PeerConfig struct { Network string `yaml:"network" ini:"network" comment:"Network; tcp, tcp4, tcp6, unix, unixpacket, kcp or quic"` LocalIP string `yaml:"local_ip" ini:"local_ip" comment:"Local IP"` ListenPort uint16 `yaml:"listen_port" ini:"listen_port" comment:"Listen port; for server role"` DialTimeout time.Duration `yaml:"dial_timeout" ini:"dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"` RedialTimes int32 `yaml:"redial_times" ini:"redial_times" comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; Unlimited when <0; for client role"` RedialInterval time.Duration `yaml:"redial_interval" ini:"redial_interval" comment:"Interval of redialing each time, default 100ms; for client role; ns,µs,ms,s,m,h"` DefaultBodyCodec string `yaml:"default_body_codec" ini:"default_body_codec" comment:"Default body codec type id"` DefaultSessionAge time.Duration `yaml:"default_session_age" ini:"default_session_age" comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` DefaultContextAge time.Duration `yaml:"default_context_age" ini:"default_context_age" comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"` SlowCometDuration time.Duration `yaml:"slow_comet_duration" ini:"slow_comet_duration" comment:"Slow operation alarm threshold; ns,µs,ms,s ..."` PrintDetail bool `yaml:"print_detail" ini:"print_detail" comment:"Is print body and metadata or not"` CountTime bool `yaml:"count_time" ini:"count_time" comment:"Is count cost time or not"` }
通信优化
-
SetMessageSizeLimit 设置报文大小的上限, 如果 maxSize<=0,上限默认为最大 uint32
func SetMessageSizeLimit(maxMessageSize uint32)
-
SetSocketKeepAlive 是否允许操作系统的发送TCP的keepalive探测包
func SetSocketKeepAlive(keepalive bool)
-
SetSocketKeepAlivePeriod 设置操作系统的TCP发送keepalive探测包的频度
func SetSocketKeepAlivePeriod(d time.Duration)
-
SetSocketNoDelay 是否禁用Nagle算法,禁用后将不在合并较小数据包进行批量发送,默认为禁用
func SetSocketNoDelay(_noDelay bool)
-
SetSocketReadBuffer 设置操作系统的TCP读缓存区的大小
func SetSocketReadBuffer(bytes int)
-
SetSocketWriteBuffer 设置操作系统的TCP写缓存区的大小
func SetSocketWriteBuffer(bytes int)
扩展包
编解码器
插件
协议
传输过滤器
其他模块
基于eRPC的项目
企业用户
开源协议
eRPC 项目采用商业应用友好的 Apache2.0 协议发布