Go 1.2 连接池设计

Reading time ~19 minutes

Go 1.3中已然带了连接池,直接使用sync.Pool即可。但Go 1.2就得亲力亲为的自己设计了。

我们设计连接池需要满足以下几个需求:

  1. 尽量保持较多的长连接,加速网络访问过程。但需要按需增加。并同步的取、存。我们可以用channel来维护连接池。   

  2. 当不再需要这么多长连接,或超过设定的连接池大小时应当回收资源。即需要设定channel大小,并且定时回收不在需要的资源。   

  3. 如果网络故障(另一端重启服务了),之前的connection对象就坏掉了,则应当关闭连接回收资源,而不应当放回连接池。 

设计Pool结构:

    type Pool struct {
        shared chan interface{} //连接池存储体
        //创建一个新的连接
        New func() (interface{}, error) 
        //关闭连接,视具体情况,可能需要较针对特定的连接客户端予以关闭。
        Close          func(interface{})
        poolSize       int //连接池大小
        recycleTimeout int  //回收连接的周期时间,
    }

那我们依次予以实现即可完成Pool。最基本的是需要取存连接。

取连接:

    func (p *Pool) Get() (interface{}, error) {
        if nil == p.shared {
            p.Init()
        }
        if nil == p.New {
            log.Println("No 'New' method is specified for the pool")
            return nil, nil
        } else {
            select {
            //pop up one element from the pool
            case x := <-p.shared:
                return x, nil
                //or else create a new one
            default:
                return p.New()
            }
        }
    }

其中需要判断,如果该Struct还未初始化完全,当用默认值初始化(调用p.Init())

放回连接:

    // Put element 'x' back to the pool.
    func (p *Pool) Put(x interface{}, lastError error) {
        if nil == x {
            return
        } else if nil != lastError {
            //error occurs in x, so close 'x' from the pool and never push it back
            p.Close(x)
            return
        } else {
            select {
            //push back the element to the pool 'shared'
            case p.shared <- x:
            //or else remove items when item size exceeds poolSize
            default:
                p.Close(x)
            }
        }
    }

若之前调用连接x出错(网络暂时中断需要重连等),则应丢弃x而不在放回连接池。而如果连接池已满(chnanel中不再能接受新连接),则也关闭该连接释放资源。因为此时已经表示有poolSize个连接已经闲置在连接池中而被取走使用了。select中某个case能传递消息,则不会调用default,则就可以拿来判断连接池是否已满。    

而初始化及定时回收盈余连接则实现如下:

    func (p *Pool) Init() *Pool {
        if 0 >= p.poolSize {
            p.poolSize = defaultPoolSize
        }
        if nil == p.shared {
            p.shared = make(chan interface{}, p.poolSize)
        }
        if 0 >= p.recycleTimeout {
            p.recycleTimeout = defaultRecycleTimeout
        }
        go func() {
            for {
                select {
                case <-time.After(time.Duration(p.recycleTimeout) * time.Minute):
                    for {
                        select {
                        case c := <-p.shared:
                            p.Close(c)
                        default:
                            goto END
                        }
                    }
                END:
                }
            }
        }()
        return p
    }

  select设定超时recycleTimeout分钟后,从连接池中取出所有连接并销毁(放在连接池,就表示此刻未被取走利用,已经闲置了)。取完所有连接后跳转到END,再次等待recycleTimeout分钟。   

而使用中只需要实现其中两个函数即可:

    var pool = Pool{
        New: func() (interface{}, error) {
            client, err := InitSocket(GetAServer())
            return client, err
        },
        Close: func(x interface{}) {
            x.(*tokenizer.SearchTokenizerClient).Transport.Close()
        },
    }

完整Pool代码实现见 https://github.com/jinntrance/pool

Original post: http://blog.josephjctang.com/2014-09/go-pool/

重操堆文这旧业之时间日志分析

写作的习惯,已经断了许久了。上一篇已经是去年的文章了,也是做的[时间日志分析]({% post_url 2019-02-23-time-log-analysis %}) 。 正好做时间总结,也把写作捡起来吧。# 低效时间分析用 [RescueTime](https://www.rescueti...… Continue reading

个人近期时间日志分析

Published on February 23, 2019

问问题的妙用

Published on May 27, 2018