Go 1.3中已然带了连接池,直接使用sync.Pool即可。但Go 1.2就得亲力亲为的自己设计了。
我们设计连接池需要满足以下几个需求:
尽量保持较多的长连接,加速网络访问过程。但需要按需增加。并同步的取、存。我们可以用channel来维护连接池。
当不再需要这么多长连接,或超过设定的连接池大小时应当回收资源。即需要设定channel大小,并且定时回收不在需要的资源。
如果网络故障(另一端重启服务了),之前的connection对象就坏掉了,则应当关闭连接回收资源,而不应当放回连接池。
设计Pool结构:
type Pool struct {
shared chan interface{} //连接池存储体
//创建一个新的连接
New func() (interface{}, error)
//关闭连接,视具体情况,可能需要较针对特定的连接客户端予以关闭。
Close func(interface{})
poolSize int //连接池大小
recycleTimeout int //回收连接的周期时间,
}
那我们依次予以实现即可完成Pool。最基本的是需要取存连接。
取连接:
func (p *Pool) Get() (interface{}, error) {
if nil == p.shared {
p.Init()
}
if nil == p.New {
log.Println("No 'New' method is specified for the pool")
return nil, nil
} else {
select {
//pop up one element from the pool
case x := <-p.shared:
return x, nil
//or else create a new one
default:
return p.New()
}
}
}
其中需要判断,如果该Struct还未初始化完全,当用默认值初始化(调用p.Init())
放回连接:
// Put element 'x' back to the pool.
func (p *Pool) Put(x interface{}, lastError error) {
if nil == x {
return
} else if nil != lastError {
//error occurs in x, so close 'x' from the pool and never push it back
p.Close(x)
return
} else {
select {
//push back the element to the pool 'shared'
case p.shared <- x:
//or else remove items when item size exceeds poolSize
default:
p.Close(x)
}
}
}
若之前调用连接x出错(网络暂时中断需要重连等),则应丢弃x而不在放回连接池。而如果连接池已满(chnanel中不再能接受新连接),则也关闭该连接释放资源。因为此时已经表示有poolSize个连接已经闲置在连接池中而被取走使用了。select中某个case能传递消息,则不会调用default,则就可以拿来判断连接池是否已满。
而初始化及定时回收盈余连接则实现如下:
func (p *Pool) Init() *Pool {
if 0 >= p.poolSize {
p.poolSize = defaultPoolSize
}
if nil == p.shared {
p.shared = make(chan interface{}, p.poolSize)
}
if 0 >= p.recycleTimeout {
p.recycleTimeout = defaultRecycleTimeout
}
go func() {
for {
select {
case <-time.After(time.Duration(p.recycleTimeout) * time.Minute):
for {
select {
case c := <-p.shared:
p.Close(c)
default:
goto END
}
}
END:
}
}
}()
return p
}
select设定超时recycleTimeout分钟后,从连接池中取出所有连接并销毁(放在连接池,就表示此刻未被取走利用,已经闲置了)。取完所有连接后跳转到END,再次等待recycleTimeout分钟。
而使用中只需要实现其中两个函数即可:
var pool = Pool{
New: func() (interface{}, error) {
client, err := InitSocket(GetAServer())
return client, err
},
Close: func(x interface{}) {
x.(*tokenizer.SearchTokenizerClient).Transport.Close()
},
}
完整Pool代码实现见 https://github.com/jinntrance/pool
Original post: http://blog.josephjctang.com/2014-09/go-pool/