使用接口查询nebula时报错

  • nebula 版本:v2.0.0
  • 部署方式(分布式 / 单机 / Docker / DBaaS):Docker-compose
  • 是否为线上版本:Y
  • 问题的具体描述:Go 客户端调用接口执行查询语句,执行出错
testPoolConfig := nebula.GetDefaultConf()
connectionPool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
func Execute(gql string) (*nebula.ResultSet, error) {
    session, err := connectionPool.GetSession(username, password)
    if err != nil {
	fmt.Print(err.Error())
    }
    defer session.Release()
    resultSet, err = session.Execute(gql)
    return resultSet, err
}

意思是没有可用的session,可是我这是本地测试的,调试的,默认MaxConnPoolSize为10,每次获取session执行完后就会def释放。

Failed to get connection: No valid connection in the idle queue and connection number has reached the pool capacity

你在外部是咋调用的? 不建议把获取 session 和 执行语句封装在一起, 本来的设计是用一个 session 来执行语句, 都执行完后释放

谢谢,你的回答让我耳目一新,我再去看看

你的意思是这样吗:https://github.com/vesoft-inc/nebula-importer/blob/0470db1bc9310e98652f9d7da21170716c44e82a/pkg/client/clientpool.go

没毛病吧 :joy:

package nebula

import (
	"fmt"
	"github.com/sirupsen/logrus"
	"github.com/spf13/viper"
	nebula "github.com/vesoft-inc/nebula-go/v2"
	"strings"
	"sudo-graph-guam/nacos"
	"sync"
	"time"
)

var poolOnce sync.Once
var spaceDesc *SpaceDesc
var connectionPool *nebula.ConnectionPool
var log = nebula.DefaultLogger{}
var username string
var password string
var nebula_default_password = "nebula"

func InitConnectionPool() {
	poolOnce.Do(func() {
		var cfs *nacos.NebulaConfig = &nacos.NebulaConfig{}
		nacos.GetNebulaConfig(cfs)
		spaceDesc = &SpaceDesc{
			Name:   cfs.Space,
			Exists: false,
		}
		username = cfs.User
		password = cfs.Password
		newPool(cfs)
	})
}

func newPool(conf *nacos.NebulaConfig) {
	port := conf.Port
	ip := conf.Hosts[0]

	hostAddress := nebula.HostAddress{Host: ip, Port: port}
	hostList := []nebula.HostAddress{hostAddress}
	testPoolConfig := nebula.GetDefaultConf()
	testPoolConfig = nebula.PoolConfig{
		MaxConnPoolSize: 100,
	}
	pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
	connectionPool = pool
	if err != nil {
		log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", ip, port, err.Error()))
	}

	session, err := pool.GetSession(username, password)
	if err != nil {
		//尝试进行nebula初始化
		if strings.Contains(err.Error(), "Bad username/password") {
			session, err = pool.GetSession(username, nebula_default_password)
		}
		if err != nil {
			log.Fatal(fmt.Sprintf("Fail to create a new session from connection pool, username: %s, password: %s, %s",
				username, password, err.Error()))
		}
		initNebulaCql := "ALTER USER root WITH PASSWORD '" + password + "';"
		resultSet, err := session.Execute(initNebulaCql)
		if err != nil {
			fmt.Print(err.Error())
		}
		if !resultSet.IsSucceed() {
			fmt.Errorf("nebula初始化失败")
		}
		logrus.Info("完成nebula初始化")
	}

	defer session.Release()

	gql := spaceDesc.CreateSpaceString()
	resultSet, err := session.Execute(gql)
	if err != nil {
		fmt.Print(err.Error())
	}
	if !resultSet.IsSucceed() {
		fmt.Errorf("nebula创建空间%s失败", spaceDesc.Name)
	}

	//创建图空间是异步实现的,Nebula将在下一个心跳周期内(默认是10s)完成图空间的创建
	time.Sleep(10 * time.Second)
	logrus.Info("完成客户端连接初始化...")
}

func Execute(gql string) (*nebula.ResultSet, error) {
	session, err := connectionPool.GetSession(username, password)
	if err != nil {
		fmt.Print(err.Error())
	}

	defer session.Release()

	resultSet, err := session.Execute(spaceDesc.UseSpaceString())
	if err != nil {
		fmt.Print(err.Error())
	}
	if !resultSet.IsSucceed() {
		fmt.Errorf("nebula使用空间%s失败", spaceDesc.Name)
	}

	resultSet, err = session.Execute(gql)
	return resultSet, err
}

func Close() {
	connectionPool.Close()
}


	session, err := pool.GetSession(username, password)
	if err != nil {
		//尝试进行nebula初始化
		if strings.Contains(err.Error(), "Bad username/password") {
			session, err = pool.GetSession(username, nebula_default_password)
		}

当用户名密码失败的时候,之前 session 中的 connection 没释放。
可以改成类似

var session Session
firstSession, err := pool.GetSession(username, password)
defer firstSession.Release()
if err != nil {
    secondSession, err := pool.GetSession(username, nebula_default_password)
    defer secondSession.Release()
    session = secondSession
else{
    session = firstSession
}

另外,最好就是你维护一个 pool,这个池子里的 session 都是登录好的。
来一个请求,直接从 pool 里拿可以用的 session 就好了。

你说的池子指的是数组吗,就是初始化时,把登录好的session放进去,但是我怎么去拿空闲的session呢

方式很多的,主要看你业务场景是怎么样的把:

简单一点的话,每个线程持有一个 session,这个 session 是已经认证过的。然后线程 execute 就用持有的这个 session 就好

我对Go不是特别熟,所以我现在只用一个唯一的session可否,或者我直接用session数组,然后用的时候,随机获取session执行(我们业务只会使用一个space)

此话题已在最后回复的 30 天后被自动关闭。不再允许新回复。