提问参考模版:
- nebula 版本:3.4.0
- 部署方式: 分布式
- 安装方式:源码编译
- 是否上生产环境:N
- 硬件信息
- 磁盘
- CPU、内存信息(暂时未知)
- 问题的具体描述
用java客户端插入思只开源的1.4亿知识图谱,插入丢数据。
第一步:用https://github.com/jievince/rdf-converter清理, 得到edge.csv和vertex.csv。
第二步:用java客户端操作入库:
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>client</artifactId>
<version>3.5.0</version>
</dependency>
@GetMapping("/insertVertex")
public String insertVertex() throws Exception {
long start = System.currentTimeMillis();
FileInputStream fis = new FileInputStream(vertexPath);
InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr);
int cnt = 0;
List<String> ids = new ArrayList<>();
Set<String> idsSet = new HashSet<>();
StringBuilder sb = new StringBuilder();
sb.append("USE xw; INSERT VERTEX IF NOT EXISTS entity(name) VALUES ");
String line = null;
while ((line = br.readLine()) != null) {
cnt = cnt + 1;
if (cnt % 50000 == 0) {
log.info("已处理:" + cnt);
}
try {
String entity = line.trim();
String id = Md5Utils.encrypt(entity);
if (idsSet.contains(id)) {
continue;
} else {
idsSet.add(id);
}
sb.append(" ")
.append("'")
.append(id)
.append("'")
.append(":")
.append("('")
.append(entity)
.append("')")
.append(",");
ids.add(id);
if (ids.size() >= 100) {
//如果最后是逗号, 则删除
if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1).append(";");
}
graphCommonService.execute(sb.toString());
TimeUnit.MILLISECONDS.sleep(100);
sb = new StringBuilder();
sb.append("USE xw; INSERT VERTEX IF NOT EXISTS entity(name) VALUES ");
ids.clear();
}
} catch (Exception e) {
log.error("error:{}", e.getMessage());
}
}
//最后一波
if (ids.size() > 0) {
log.info("最后一波插入...");
//如果最后是逗号, 则删除
if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1).append(";");
}
graphCommonService.execute(sb.toString());
}
br.close();
isr.close();
fis.close();
log.info("id size:{}", idsSet.size());
log.info("插入定点结束, 耗时:{}秒", (System.currentTimeMillis() - start) / 1000f);
return "ok";
}
graphCommonService.execute(sb.toString());最终调用的是Session中的方法
public synchronized ResultSet execute(String stmt) throws
IOErrorException {
return executeWithParameter(stmt,
(Map<String, Object>) Collections.EMPTY_MAP);
}
现象:
①第一次插入,batch size=500,没有sleep,得到42331356个entity,
②第二次插入,batch size=100,sleep 100毫秒,关闭compaction,得到43937172个entity。
但是它们都没有达到正确的entity的数量:
2023-06-16 07:31:37.359 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74000000
2023-06-16 07:31:40.173 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74050000
2023-06-16 07:31:43.277 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74100000
2023-06-16 07:31:46.183 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74150000
2023-06-16 07:31:47.988 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74200000
2023-06-16 07:31:49.775 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74250000
2023-06-16 07:32:02.897 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 已处理:74300000
2023-06-16 07:32:18.476 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 最后一波插入...
2023-06-16 07:32:18.484 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : id size:45651393
2023-06-16 07:32:18.485 INFO 37292 --- [nio-9062-exec-1] c.c.x.controller.OwnthinkController : 插入定点结束, 耗时:52089.234秒
正确的应该是45651393个entity。但是随着batch size的减少和sleep的增加,丢失的数据有所减少。
似乎丢数据是个must。
这里有两份最新的nebula日志:
①
nebula-storaged.k8s101.root.log.INFO.20230515-213149.19665 (943.9 KB)
②
nebula-storaged.k8s101.root.log.INFO.20230615-154018.17803 (49.5 KB)