Markdown lint (#58)
* markdown linter * format markdown docs * format exiting markdown docs
This commit is contained in:
@@ -24,7 +24,7 @@ MapReduce主要有三个参数,第一个参数为generate用以生产数据,
|
||||
|
||||
场景一: 某些功能的结果往往需要依赖多个服务,比如商品详情的结果往往会依赖用户服务、库存服务、订单服务等等,一般被依赖的服务都是以rpc的形式对外提供,为了降低依赖的耗时我们往往需要对依赖做并行处理
|
||||
|
||||
```
|
||||
```go
|
||||
func productDetail(uid, pid int64) (*ProductDetail, error) {
|
||||
var pd ProductDetail
|
||||
err := mr.Finish(func() (err error) {
|
||||
@@ -42,15 +42,16 @@ func productDetail(uid, pid int64) (*ProductDetail, error) {
|
||||
log.Printf("product detail error: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
return &pd, nil
|
||||
}
|
||||
```
|
||||
|
||||
该示例中返回商品详情依赖了多个服务获取数据,因此做并发的依赖处理,对接口的性能有很大的提升
|
||||
|
||||
场景二: 很多时候我们需要对一批数据进行处理,比如对一批用户id,效验每个用户的合法性并且效验过程中有一个出错就认为效验失败,返回的结果为效验合法的用户id
|
||||
|
||||
```
|
||||
```go
|
||||
func checkLegal(uids []int64) ([]int64, error) {
|
||||
r, err := mr.MapReduce(func(source chan<- interface{}) {
|
||||
for _, uid := range uids {
|
||||
@@ -85,9 +86,11 @@ func check(uid int64) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
```
|
||||
|
||||
该示例中,如果check过程出现错误则通过cancel方法结束效验过程,并返回error整个效验过程结束,如果某个uid效验结果为false则最终结果不返回该uid
|
||||
|
||||
**MapReduce使用注意事项**
|
||||
***MapReduce使用注意事项***
|
||||
|
||||
* mapper和reducer中都可以调用cancel,参数为error,调用后立即返回,返回结果为nil, error
|
||||
* mapper中如果不调用writer.Write则item最终不会被reducer聚合
|
||||
* reducer中如果不调用writer.Wirte则返回结果为nil, ErrReduceNoOutput
|
||||
@@ -96,12 +99,13 @@ func check(uid int64) (bool, error) {
|
||||
***实现原理分析:***
|
||||
|
||||
MapReduce中首先通过buildSource方法通过执行generate(参数为无缓冲channel)产生数据,并返回无缓冲的channel,mapper会从该channel中读取数据
|
||||
```
|
||||
|
||||
```go
|
||||
func buildSource(generate GenerateFunc) chan interface{} {
|
||||
source := make(chan interface{})
|
||||
go func() {
|
||||
defer close(source)
|
||||
generate(source)
|
||||
generate(source)
|
||||
}()
|
||||
|
||||
return source
|
||||
@@ -109,7 +113,8 @@ func buildSource(generate GenerateFunc) chan interface{} {
|
||||
```
|
||||
|
||||
在MapReduceWithSource方法中定义了cancel方法,mapper和reducer中都可以调用该方法,调用后主线程收到close信号会立马返回
|
||||
```
|
||||
|
||||
```go
|
||||
cancel := once(func(err error) {
|
||||
if err != nil {
|
||||
retErr.Set(err)
|
||||
@@ -125,7 +130,8 @@ cancel := once(func(err error) {
|
||||
```
|
||||
|
||||
在mapperDispatcher方法中调用了executeMappers,executeMappers消费buildSource产生的数据,每一个item都会起一个goroutine单独处理,默认最大并发数为16,可以通过WithWorkers进行设置
|
||||
```
|
||||
|
||||
```go
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
wg.Wait() // 保证所有的item都处理完成
|
||||
@@ -159,7 +165,8 @@ for {
|
||||
```
|
||||
|
||||
reducer单goroutine对数mapper写入collector的数据进行处理,如果reducer中没有手动调用writer.Write则最终会执行finish方法对output进行close避免死锁
|
||||
```
|
||||
|
||||
```go
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
@@ -172,13 +179,12 @@ go func() {
|
||||
}()
|
||||
```
|
||||
|
||||
在该工具包中还提供了许多针对不同业务场景的方法,实现原理与MapReduce大同小异,感兴趣的同学可以查看源码学习
|
||||
在该工具包中还提供了许多针对不同业务场景的方法,实现原理与MapReduce大同小异,感兴趣的同学可以查看源码学习
|
||||
|
||||
* MapReduceVoid 功能和MapReduce类似但没有结果返回只返回error
|
||||
* Finish 处理固定数量的依赖,返回error,有一个error立即返回
|
||||
* FinishVoid 和Finish方法功能类似,没有返回值
|
||||
* Map 只做generate和mapper处理,返回channel
|
||||
* MapVoid 和Map功能类似,无返回
|
||||
|
||||
|
||||
本文主要介绍了go-zero框架中的MapReduce工具,在实际的项目中非常实用。用好工具对于提升服务性能和开发效率都有很大的帮助,希望本篇文章能给大家带来一些收获。
|
||||
|
||||
|
||||
Reference in New Issue
Block a user