nebula-java scan操作找不到数据

问题描述: java客户端连接nebula使用scan命令(com.vesoft.nebula.client.storage.StorageClient#scanVertex())进行导数据操作,Iterator返回的hasNext为false,但是nebula数据库中是有数据的
nebula版本: 2020.06.29-nightly
nebula-java版本: 1.0.0-rc4.20200323
nebula端口信息:

[INFO] nebula-metad: Running as 36200, Listening on 45500
[INFO] nebula-graphd: Running as 36210, Listening on 3699
[INFO] nebula-storaged: Running as 36220, Listening on 44500

nebula show host信息

nebula数据库数据

[root@p88230v bin]# ./db_dump -space=graphx -db_path=/usr/local/nebula_v1_new/data/storage/nebula/ -mode=stat -parts=1,2,3
===========================PARAMS============================
mode: stat
meta server: 127.0.0.1:45500
space: graphx
path: /usr/local/nebula_v1_new/data/storage/nebula/
parts: 1,2,3
vids: 
tags: 
edges: 
limit: 1000
===========================PARAMS============================

===========================STATISTICS============================
COUNT: 1000
VERTEX COUNT: 159
EDGE COUNT: 841
TAG STATISTICS: 
        mobile : 125

java代码

public class NebulaVertexScanTest{

    private static final Logger LOGGER = LoggerFactory.getLogger(NebulaVertexScanTest.class);
    private static MetaClient metaClient;
    private static StorageClient storageClient;
    private static ScanVertexProcessor processor;

    private static void scanVertex(String space, int part, Map<String, List<String>> returnCols,
                                   boolean allCols) {
        System.out.println(("Start to scan space " + space + " part " + part));
        try {
            Iterator<ScanVertexResponse> iterator =
                    storageClient.scanVertex(space, part, returnCols, allCols,
                            100, 0L, Long.MAX_VALUE);
            while (iterator.hasNext()) {
                ScanVertexResponse response = iterator.next();
                if (response == null) {
                    System.err.println(("Error occurs while scan vertex"));
                    break;
                }
                process(space, response);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void process(String space, ScanVertexResponse response) {
//        Result result = processor.process(space, response);
        System.out.println(("process " + response + " vertices"));
    }

    @Test
    public void testScan() {
        try {
            MetaClientImpl metaClientImpl = new MetaClientImpl("127.0.0.1", 45500);
            metaClientImpl.connect();

            metaClient = metaClientImpl;

            StorageClientImpl storageClientImpl = new StorageClientImpl(metaClientImpl);
            storageClient = storageClientImpl;

            processor = new ScanVertexProcessor(metaClientImpl);

            Map<String, List<String>> returnCols = new HashMap<>();
            returnCols.put("mobile", Arrays.asList());

            boolean allCols = true;

            for (Map.Entry<String, Map<Integer, List<HostAndPort>>> spaceEntry :
                    metaClient.getPartsAllocFromCache().entrySet()) {
                String space = spaceEntry.getKey();
                for (Integer part : spaceEntry.getValue().keySet()) {
                    scanVertex(space, part, returnCols, allCols);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

日志输出内容

Start to scan space graphx part 1
process ScanVertexResponse (
  result : ResponseCommon (
    failed_codes : [
    ],
    latency_in_us : 12912
  ),
  vertex_schema : {
    248 : Schema (
        columns : [
          ColumnDef (
              name : "mobile",
              type : ValueType (
                type : STRING (6)
              )
            )
          ColumnDef (
              name : "update_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
          ColumnDef (
              name : "create_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
        ],
        schema_prop : SchemaProp (

        )
      )
  },
  vertex_data : [
  ],
  has_next : false,
  next_cursor : 
) vertices
Start to scan space graphx part 2
process ScanVertexResponse (
  result : ResponseCommon (
    failed_codes : [
    ],
    latency_in_us : 14528
  ),
  vertex_schema : {
    248 : Schema (
        columns : [
          ColumnDef (
              name : "mobile",
              type : ValueType (
                type : STRING (6)
              )
            )
          ColumnDef (
              name : "update_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
          ColumnDef (
              name : "create_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
        ],
        schema_prop : SchemaProp (

        )
      )
  },
  vertex_data : [
  ],
  has_next : false,
  next_cursor : 
) vertices
Start to scan space graphx part 3
process ScanVertexResponse (
  result : ResponseCommon (
    failed_codes : [
    ],
    latency_in_us : 11417
  ),
  vertex_schema : {
    248 : Schema (
        columns : [
          ColumnDef (
              name : "mobile",
              type : ValueType (
                type : STRING (6)
              )
            )
          ColumnDef (
              name : "update_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
          ColumnDef (
              name : "create_time",
              type : ValueType (
                type : TIMESTAMP (21)
              )
            )
        ],
        schema_prop : SchemaProp (

        )
      )
  },
  vertex_data : [
  ],
  has_next : false,
  next_cursor : 
) vertices

有改过schema版本吗

怎么样的操作是改schema的版本呢?

好老的版本, 应该是 alter tag/edge 的操作

看样子是做过alter的操作,那这种情况怎么办呢?我们需要将1.0的数据导出来再导入到3.6版本

1.你数据量不多的话可以match出来,解析成csv文件。
2. 你看下storaged的日志,有没有在处理数据。
3. 下面代码中 mobile对应的属性集合 加一个属性看看
Map<String, List> returnCols = new HashMap<>();
returnCols.put(“mobile”, Arrays.asList());

  1. 数据量有几个T用match恐怕不行
  2. 执行scan操作的时候没有任何日志,我看了ScanVertexProcessor.cpp也没打印啥日志
  3. returnCols.put(“mobile”, Arrays.asList());这个也已经试过了也没有任何数据
  4. 另外我发现没有做alter的space也一样scan不出来

问题怎么解决呢?

@steam 存储同学看看吧

请问一下前面提到变更了schema就无法scan的原因是啥呢?

担心获取到的schema是老的,目前已不存在这些属性了,这样就没有符合属性条件的数据。 但应该不会,就算属性不对,id也会返回的。

看了源码,改了属性其实还是能返回二进制数据,只不过在decode的时候由于存在多版本,就有可能导致从mate中获取的schemaVersion跟row数据本身的version不匹配,比如前面说的增减字段,这个时候就会decode错误,错误原因就是version不匹配,相关源代码:

std::unique_ptr<RowReader> RowReader::getRowReader(
        const Slice& row,
        std::shared_ptr<const SchemaProviderIf> schema) {
    SchemaVer ver = getSchemaVer(row);
    CHECK_EQ(ver, schema->getVersion());
    return std::unique_ptr<RowReader>(new RowReader(row, std::move(schema)));
}

我通过编译源代码加入日志打印的方式找到问题了,ScanVertexProcessor::process这方法中根据时间过滤不通过导致的,TagVersio为0,导致ts为int64_t的最大值,我传的endTime也是最大值,问题来了为何key的tagVerion会为0?

// only return data within time range [start, end)
        TagVersion version = folly::Endian::big(NebulaKeyUtils::getVersion(key));
        int64_t ts = std::numeric_limits<int64_t>::max() - version;
        if (ts < startTime || ts >= endTime) {
            LOG(ERROR) << "ScanVertexProcessor::process, Out of time rage,vertex key:" << key << ", ts: " << ts << ", tagVersion: " << version;
            continue;
        }

日志输出:

E1111 15:31:47.550460 88437 ScanVertexProcessor.cpp:86] ScanVertexProcessor::process, Out of time rage,vertex key:^A^A^@^@^@3^Kز<E8><A7><D8><F8>^@^@^@^@^@^@^@^@^@
^@^@, ts: 9223372036854775807, tagVersion: 0

NebulaKeyUtils::getVersion为啥会返回为0呢?我通过日志打印offset为16,rawKey.size为24,为何rawKey.data()+offset就等于0了?没明白

static int64_t getVersion(const folly::StringPiece& rawKey) {
        CHECK(isVertex(rawKey) || isEdge(rawKey));
        auto offset = rawKey.size() - sizeof(int64_t);
        return readInt<int64_t>(rawKey.data() + offset, sizeof(int64_t));
    }

@江一旺

我用 python storage client 试着复现了 alter schema 之后无法扫描数据的情况,似乎用 db_dump 是可以扫的,复现和扫的方式在这里:

https://gist.github.com/wey-gu/81417425bb1ed39f8b219fc3debc79b9

你看看

1 个赞

能用 python client 扫一下么?虽然慢但解释型语言方便交互改好 debug

你把这几行删了试试,隐约有印象后来改过这

1 个赞

这个已经我已经试过了,跳过这个 步骤就可以读取到数据

我是直接注释掉这几行代码,然后自行编译nebula替换线上的版本吗?我不太清楚这样是否有问题,另外我看了一下nebula编译还有优化,我这种编译方式性能方面会不会有问题?此外scan出的数据会不会有多余的呢?

我参考的编译文档:1.2.0编译文档

可以改成py,不过感觉不是client的问题,因为其他的space可以正常被scan出来

1 个赞