go-micro配置介绍

go-micro/config基础特性及用法,参考官方文档

数据结构&流程图

核心数据结构

感觉go-micro的设计理念应该是一切皆是Interface,所以除了Pluggable Sources等特性外,内部的一些数据结构上也预留了插件化的可能。几个数据结构在官网上都介绍比较清楚了,主要关注下几个核心数据结构的方法。

Config

暴露给用户的对象,用于加载、读取和同步数据源。

// 动态配置的抽象接口。可以作为一个单独组建给业务使用,但框架自身依赖**DefaultConfig**实现
// 将多个source聚合成一个对外的统一数据源,支持加载、同步、观测一个或多个source。
type Config interface {
// provide the reader.Values interface
reader.Values
// Init the config
Init(opts ...Option) error
// Options in the config
Options() Options
// Stop the config loader/watcher
Close() error
// Load config sources
Load(source ...source.Source) error
// Force a source changeset sync
Sync() error
// Watch a value for changes
Watch(path ...string) (Watcher, error)
}
  • 通过Load加载数据源(source)到config内部
  • 通过Watch返回的wather观察数据更新
  • Sync写回给数据源

虽然gomicro中一切皆Interface,但内部实现依赖DefaultConfig。Config的实现依赖两个核心数据结构:

func (c *config) Init(opts ...Option) error {
c.opts = Options{
Loader: memory.NewLoader(), // Loader
Reader: json.NewReader(), // Reader
}
c.exit = make(chan bool)
for _, o := range opts {
o(&c.opts)
}

// ...
}

Loader负责加载Source、管理watcher链表、管理sets数据、执行Merge操作等,为Config屏蔽了不少下层逻辑,使对外的接口尽可能干净。
Reader就比较好理解了,参见官方文档。

注意Config中存在一个唯一的goroutine,用来聚合各个Source的变更。每个Source在Loader中也维护了一个goroutine,详见后面Loader部分。

func (c *config) run() {
watch := func(w loader.Watcher) error {
for {
// get changeset
snap, err := w.Next()
if err != nil {
return err
}

c.Lock()

// save
c.snap = snap

// set values
c.vals, _ = c.opts.Reader.Values(snap.ChangeSet)

c.Unlock()
}
}

Source

配置内容源,支持以下格式:

  • configmap - k8s configmap
  • consul - consul
  • etcd - etcd v3
  • env - 环境变量
  • file - 配置文件
  • flag - 命令行标识
  • grpc - grpc
  • memory - 内存
// Source is the source from which config is loaded
type Source interface {
Read() (*ChangeSet, error) // 提供给Loadder使用,通过Read读取到ChangeSet
Write(*ChangeSet) error
Watch() (Watcher, error) // 创建一个Source对应的Watcher
String() string
}

// 代表从Source读取的内容。可能为全量,也可能为增量,取决于Source的实现
type ChangeSet struct {
Data []byte
Checksum string
Format string
Source string
Timestamp time.Time
}

// Watcher watches a source for changes
type Watcher interface {
Next() (*ChangeSet, error) // 无变化时会block
Stop() error
}

Watcher的底层实现还是依赖各Source的watcher,比如file source,就引入了”fsnotify/fsnotify”来监控文件的变化。每个Source被Loader Load时会生成一个对应的source watcher goroutine,用于监控source的变更。
各种Source的实现就没有具体关注了。

Loader

用于加载Source,实现的核心数据结构。

// Loader manages loading sources
type Loader interface {
// Stop the loader
Close() error
// Load the sources
Load(...source.Source) error
// A Snapshot of loaded config
Snapshot() (*Snapshot, error)
// Force sync of sources
Sync() error
// Watch for changes
Watch(...string) (Watcher, error)
// Name of loader
String() string
}

// Watcher lets you watch sources and returns a merged ChangeSet
type Watcher interface {
// First call to next may return the current Snapshot
// If you are watching a path then only the data from
// that path is returned.
Next() (*Snapshot, error)
// Stop watching for changes
Stop() error
}

// 功能:ChangeSet的聚合。
// 场景:当loader重新加载数据之后会保存一份snapshot
type Snapshot struct {
// The merged ChangeSet
ChangeSet *source.ChangeSet
// Deterministic and comparable version of the snapshot
Version string
}

框架中提供了一个缺省的memory loader,Loader的存在主要基于以下几个原因:
1、Source是可以聚合(Merge)的,在Reader中提供了Merge的实现。当系统存在多份Source时,需要将Source聚合好之后给到Config,否则Config中就得处理此类逻辑。
2、对Source的failover。当Load失败时,提供了Snapshot能力。
Loader还维护了核心的数据结构:watcher列表。source watcher goroutine发现内容变更时会将变更的内容通知给所有goroutine(通过Get读取对应的path值),“通知”只是将内容写到了update 队列。具体的变更逻辑还得外部调用方调用Next感知。
注意调用方增加Watcher时,Loader只是在watcher list中增加一个节点,没有goroutine的开销。但是Suorce的内容变更会通知所有wathcher,wather会根据内容的变化情况(bytes.Equal)来决定是否通知上层变更。

    // Loader中通知各个watcher,内容可能已变化,写入changeset
for _, w := range watchers {
select {
case w.updates <- m.vals.Get(w.path...):
default:
}
}

// ...


// Loader提供的变更获取接口,读取changeset
func (w *watcher) Next() (*loader.Snapshot, error) {
for {
select {
case <-w.exit:
return nil, errors.New("watcher stopped")
case v := <-w.updates:
if bytes.Equal(w.value.Bytes(), v.Bytes()) {
continue
}
w.value = v

cs := &source.ChangeSet{
Data: v.Bytes(),
Format: w.reader.String(),
Source: "memory",
Timestamp: time.Now(),
}
cs.Sum()

return &loader.Snapshot{
ChangeSet: cs,
Version: fmt.Sprintf("%d", time.Now().Unix()),
}, nil
}
}
}

// config暴露给外部的watcher接口, 封装Loader的Next接口
func (w *watcher) Next() (reader.Value, error) {
for {
s, err := w.lw.Next() // 内部调用loader的Next获取ChangeSet
if err != nil {
return nil, err
}

// only process changes
if bytes.Equal(w.value.Bytes(), s.ChangeSet.Data) {
continue
}

v, err := w.rd.Values(s.ChangeSet)
if err != nil {
return nil, err
}

w.value = v.Get()
return w.value, nil
}
}

另上面也提供,config中也维护了一个gouroutine,watch的path是各个source的聚合。当一个Source的内容变更时需要将各个Source内容重新聚合后返回给config,以更新Config中的snapshot和value值。

总结

图中画得比较明白了,比较难以理解的地方也在于Loader为支持多个Source的一些操作,包括sets列表、Merge操作。当理解需要支持多个Source的聚合之后就比较好理解了。

整体感觉config的封装过于复杂了,包括的viper的引入还进行了改造,在代码实现中有比较多的类似的逻辑,包括命名也有不少重复,阅读起来有点心累。估计代码圈复杂度比较高。。