BIN
doc/images/mr_time.png
Normal file
BIN
doc/images/mr_time.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 104 KiB |
@@ -1,6 +1,14 @@
|
||||
# 通过MapReduce降低服务响应时间
|
||||
|
||||
go-zero微服务框架中提供了许多开箱即用的工具,好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错,实现代码风格的统一方便他人阅读等等,本系列文章将分别介绍go-zero框架中工具的使用及其实现原理
|
||||
在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。
|
||||
|
||||
那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供了 [WaitGroup](https://golang.org/pkg/sync/#WaitGroup) 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且WaitGroup中对变量的赋值往往需要加锁,每个依赖函数都需要添加Add和Done对于新手来说比较容易出错
|
||||
|
||||
基于以上的背景,go-zero框架中为我们提供了并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/blob/master/core/mr/mapreduce.go),该工具开箱即用,不需要做什么初始化,我们通过下图看下使用MapReduce和没使用的耗时对比:
|
||||
|
||||

|
||||
|
||||
相同的依赖,串行处理的话需要200ms,使用MapReduce后的耗时等于所有依赖中最大的耗时为100ms,可见MapReduce可以大大降低服务耗时,而且随着依赖的增加效果就会越明显,减少处理耗时的同时并不会增加服务器压力
|
||||
|
||||
## 并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/tree/master/core/mr)
|
||||
|
||||
|
||||
114
example/mapreduce/mr/mr.go
Normal file
114
example/mapreduce/mr/mr.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/tal-tech/go-zero/core/mr"
|
||||
"github.com/tal-tech/go-zero/core/timex"
|
||||
)
|
||||
|
||||
type user struct{}
|
||||
|
||||
func (u *user) User(uid int64) (interface{}, error) {
|
||||
time.Sleep(time.Millisecond * 30)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type store struct{}
|
||||
|
||||
func (s *store) Store(pid int64) (interface{}, error) {
|
||||
time.Sleep(time.Millisecond * 50)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type order struct{}
|
||||
|
||||
func (o *order) Order(pid int64) (interface{}, error) {
|
||||
time.Sleep(time.Millisecond * 40)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var (
|
||||
userRpc user
|
||||
storeRpc store
|
||||
orderRpc order
|
||||
)
|
||||
|
||||
func main() {
|
||||
start := timex.Now()
|
||||
_, err := productDetail(123, 345)
|
||||
if err != nil {
|
||||
log.Printf("product detail error: %v", err)
|
||||
return
|
||||
}
|
||||
log.Printf("productDetail time: %v", timex.Since(start))
|
||||
|
||||
// the data processing
|
||||
res, err := checkLegal([]int64{1, 2, 3})
|
||||
if err != nil {
|
||||
log.Printf("check error: %v", err)
|
||||
return
|
||||
}
|
||||
log.Printf("check res: %v", res)
|
||||
}
|
||||
|
||||
type ProductDetail struct {
|
||||
User interface{}
|
||||
Store interface{}
|
||||
Order interface{}
|
||||
}
|
||||
|
||||
func productDetail(uid, pid int64) (*ProductDetail, error) {
|
||||
var pd ProductDetail
|
||||
err := mr.Finish(func() (err error) {
|
||||
pd.User, err = userRpc.User(uid)
|
||||
return
|
||||
}, func() (err error) {
|
||||
pd.Store, err = storeRpc.Store(pid)
|
||||
return
|
||||
}, func() (err error) {
|
||||
pd.Order, err = orderRpc.Order(pid)
|
||||
return
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &pd, nil
|
||||
}
|
||||
|
||||
func checkLegal(uids []int64) ([]int64, error) {
|
||||
r, err := mr.MapReduce(func(source chan<- interface{}) {
|
||||
for _, uid := range uids {
|
||||
source <- uid
|
||||
}
|
||||
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
|
||||
uid := item.(int64)
|
||||
ok, err := check(uid)
|
||||
if err != nil {
|
||||
cancel(err)
|
||||
}
|
||||
if ok {
|
||||
writer.Write(uid)
|
||||
}
|
||||
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
|
||||
var uids []int64
|
||||
for p := range pipe {
|
||||
uids = append(uids, p.(int64))
|
||||
}
|
||||
writer.Write(uids)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.([]int64), nil
|
||||
}
|
||||
|
||||
func check(uid int64) (bool, error) {
|
||||
// do something check user legal
|
||||
time.Sleep(time.Millisecond * 20)
|
||||
return true, nil
|
||||
}
|
||||
Reference in New Issue
Block a user