用RAMDirectory做缓存提高Lucene性能
原文链接 http://codepub.cn/2017/06/28/Use-RAMDirectory-to-do-the-cache-to-improve-lucene-s-performance/
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
RAMDirectory和FSDirectory都继承自BaseDirectory,而BaseDirectory继承自Directory,Directory是Lucene中设计的一个顶层抽象类,可以将其看做本地文件系统的一个目录。
RAMDirectory是基于内存实现的,具有较高的存储速度,但是受到内存大小的限制,而FSDirectory是基于文件系统实现的,针对不同的操作系统有不同的具体实现类,这些实现类无需用户操心,只需要调用FSDirectory.open(Path path)方法,它就会帮助我们选择最适合的子类,FSDirectory的瓶颈在于磁盘I/O。
如果机器内存足够大的话,那么组合使用RAMDirectory和FSDirectory将能够极大地提高Lucene的性能。组合使用两者的应用场景很多,不同的场景可以分别解决不同的需求,仅列举如下
- 批量索引,而无需检索的情况下,先把document存到RAMDirectory中,当达到一定数量之后,再把这些索引一次性加入FSDirectory里
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.RAMDirectory;
import java.io.IOException;
import java.nio.file.Paths;
/**
* <p>
* Created by wangxu on 2017/06/19 15:31.
* </p>
* <p>
* Description: 基于Lucene 6.5开发
* </p>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0 <br/>
* WebSite: http://codepub.cn <br>
* Licence: Apache v2 License
*/
public class RamFSDirectoryDemo {
public static void main(String[] args) throws IOException {
RAMDirectory ramDirectory = new RAMDirectory();
IndexWriter ramWriter = new IndexWriter(ramDirectory, new IndexWriterConfig(new WhitespaceAnalyzer()).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
FSDirectory fsDirectory = FSDirectory.open(Paths.get("F:/index_RAM_FS"));
IndexWriter fsWriter = new IndexWriter(fsDirectory, new IndexWriterConfig(new WhitespaceAnalyzer()).setOpenMode(IndexWriterConfig.OpenMode
.CREATE_OR_APPEND));
//简单起见,这里的数量都比较少,例如索引10000个document,每100个document作为一批
int tempCount = 0;
for (int i = 0; i < 10000; i++) {
Document document = new Document();
document.add(new StringField("key", String.valueOf(i), Field.Store.YES));
ramWriter.addDocument(document);
tempCount++;
if (tempCount % 100 == 0) {
ramWriter.commit();
ramWriter.close();
fsWriter.addIndexes(ramDirectory);
ramDirectory.close();
fsWriter.commit();
ramDirectory = new RAMDirectory();
ramWriter = new IndexWriter(ramDirectory, new IndexWriterConfig(new WhitespaceAnalyzer()).setOpenMode(IndexWriterConfig.OpenMode.CREATE));
}
}
//退出时确保RAMDirectory中内容都被加入本地
if (ramWriter != null) {
ramWriter.commit();
ramWriter.close();
fsWriter.addIndexes(ramDirectory);
ramDirectory.close();
}
IndexSearcher indexSearcher = new IndexSearcher(DirectoryReader.open(fsWriter.getDirectory()));
fsWriter.close();
int count = indexSearcher.count(new MatchAllDocsQuery());
System.out.println(count);
}
}
- 索引的同时需要搜索,前提是内存总量大于索引文件总量,如果要求新加入的索引对搜索可见,即实时搜索,要怎么做呢?显然实时搜索需要writer和searcher共用同一份索引,同时要定时的将内存中索引备份到文件系统,否则机器一旦宕机,内存中所有的索引文件都将丢失。代码实现也很简单,在备份的时候,可以使用调度线程去进行备份操作,同时还不影响主线程继续接受索引请求;备份策略有两种:全量和增量,增量直接比较文件名,将新增文件拷贝到文件系统,同时删除已过时的索引文件即可。
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.RAMDirectory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* <p>
* Created by wangxu on 2017/06/19 15:31.
* </p>
* <p>
* Description: 基于Lucene 6.5开发
* </p>
*
* @author Wang Xu
* @version V1.0.0
* @since V1.0.0 <br/>
* WebSite: http://codepub.cn <br>
* Licence: Apache v2 License
*/
public class RamFSDirectoryDemo {
private static IndexWriter ramWriter;
private static RAMDirectory ramDirectory;
private static final Backup backup = new Backup();
public static void main(String[] args) throws IOException {
//将文件系统中索引文件加载到内存中
FSDirectory fsDirectory = FSDirectory.open(Paths.get("F:/index_RAM_FS"));
ramDirectory = new RAMDirectory(fsDirectory, IOContext.READONCE);
fsDirectory.close();
IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new WhitespaceAnalyzer());
indexWriterConfig.setIndexDeletionPolicy(new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()));
ramWriter = new IndexWriter(ramDirectory, indexWriterConfig);
//先添加一条记录
Document document = new Document();
document.add(new StringField("first", "first", Field.Store.YES));
ramWriter.addDocument(document);
ramWriter.commit();
//定时备份线程
ScheduledExecutorService backupThread = Executors.newSingleThreadScheduledExecutor();
backupThread.scheduleAtFixedRate(backup, 0, 2, TimeUnit.SECONDS);
//可以继续接受索引请求
document = new Document();
document.add(new StringField("key", "key", Field.Store.YES));
ramWriter.addDocument(document);
ramWriter.commit();
//等待索引拷贝完成
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
//接受搜索请求,可实现实时搜索
IndexSearcher indexSearcher = new IndexSearcher(DirectoryReader.open(ramWriter.getDirectory()));
int count = indexSearcher.count(new MatchAllDocsQuery());
System.out.println("total hits: " + count);
System.out.println("内存中索引文件:" + Arrays.asList(ramDirectory.listAll()));
//查看备份索引是否完整,此处有一个注意点,如果需要在备份索引上打开searcher,那么在备份索引文件的时候需要先备份其它文件,最后再备份segments_N文件
//因为开searcher的时候,会先加载segments_N文件,这种方式可以保证加载完segments_N文件之后,再加载其它文件一定成功
fsDirectory = FSDirectory.open(Paths.get("F:/index_RAM_FS"));
System.out.println("备份中索引文件:" + Arrays.asList(fsDirectory.listAll()));
backupThread.shutdown();
}
static class Backup implements Runnable {
public void run() {
IndexWriterConfig config;
SnapshotDeletionPolicy indexDeletionPolicy = null;
IndexCommit snapshot = null;
try {
ramWriter.commit();
config = (IndexWriterConfig) ramWriter.getConfig();
indexDeletionPolicy = (SnapshotDeletionPolicy) config.getIndexDeletionPolicy();
snapshot = indexDeletionPolicy.snapshot();
config.setIndexCommit(snapshot);
Collection<String> fileNames = snapshot.getFileNames();
Path path = Paths.get("F:/index_RAM_FS");
//全量增量任选一种即可
boolean b = incrementalBackup(fileNames, path);
//boolean b = fullBackup(fileNames, path);
if (!b) {
System.err.println("Backup occurs error!");
}
} catch (IOException e) {
e.printStackTrace();
System.err.println(e.getMessage());
} finally {
try {
indexDeletionPolicy.release(snapshot);
} catch (IOException e) {
e.printStackTrace();
System.err.println(e.getMessage());
}
}
}
private boolean fullBackup(Collection<String> fileNames, Path path) {
Objects.requireNonNull(path);
Directory to;
try {
to = FSDirectory.open(path);
// 全量备份,直接清空拷贝
for (File file : path.toFile().listFiles()) {
file.delete();
}
for (String fileName : fileNames) {
to.copyFrom(ramDirectory, fileName, fileName, IOContext.DEFAULT);
}
to.close();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
private boolean incrementalBackup(Collection<String> fileNames, Path path) {
// 增量备份,稍微复杂一些,比较文件名,将ramDirectory中新增索引拷贝过去,同时将ramDirectory中不存在的索引但是在path中存在的旧索引删除
Objects.requireNonNull(path);
//fileNames被IndexCommit引用,需要重新构造set集合,进行移除操作
Set<String> files = new HashSet<>(fileNames);
for (File file : path.toFile().listFiles()) {
if (files.contains(file.getName())) {
//该索引已存在,则不拷贝
files.remove(file.getName());
} else {
//删除已经过时的索引
file.delete();
}
}
//拷贝全部新增索引
try {
Directory to = FSDirectory.open(path);
for (String file : files) {
to.copyFrom(ramDirectory, file, file, IOContext.DEFAULT);
}
to.close();
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
}
}
如果内存不够大,不能够存放全部的本地文件系统索引,那么如何实现呢?这时候,需要两套IndexWriter,一套用来处理本地文件系统中的索引,另一套在内存中可以接受新的索引请求;对索引的操作无非就是新增、删除、更新这三种,下面分别论述
- 删除索引: 对于删除操作比较简单,两个IndexWriter都执行一次即可,如果其中一个IndexWriter中不存在待删除的索引的话,那么对索引文件不会有任何影响;这时候又分两种情况,一是待删除的索引在内存中,那么删除-提交-重新打开索引,耗时很短;二是待删除索引在硬盘上,这时候删除-提交,到底要不要重新打开,需要视业务而定,因为硬盘上的索引通常是很大的,如果频繁删除频繁重新打开的话,是很耗性能的,而如果不重新打开,这时候删除的索引对searcher是不可见的,也就是说用户仍然可以搜索到已删除的索引,比较好的方式是用一个后台线程去定时的重新打开硬盘索引,然后更新searcher即可,但是这样对于删除操作无法做到准实时,会有一定的延时。
- 新增索引:所有的新增操作都使用内存中的IndexWriter(称为A),然后提交-重新打开索引-更新searcher,一切都是在内存中操作,耗时极短,对于实时搜索是解决了,而且Lucene中已经提供了MultiReader可以组合多个子reader,很适合这种情况;但是当内存中索引尺寸达到一定大小之后,需要将其合并到文件系统中;在合并的时候还需要另外再开一个内存中的IndexWriter(称为B)用来接受索引合并期间的索引新增操作。不过这种方式实现起来很复杂,需要处理很多问题,例如在内存索引 A 和硬盘索引合并期间如果有更新删除操作怎么处理?在合并之后需要更新MultiReader,但是旧的MultiReader上面还挂有搜索请求怎么办?在新开一个内存索引 B 之后,如何让MultiReader覆盖住它?因为在 B 中必须先有索引,然后才能开Reader,进而才能更新MultiReader;若新增索引速度实在太快,在合并过程没有完成的时候,内存索引又满了,要怎么办?除此以外还有很多问题需要考虑。
- 更新索引:更新其实可以看作是两步操作,先删除后新增,这两步可以借鉴上面的论述。更新操作是无法同时应用于两个IndexWriter的,因为在Lucene中更新的逻辑是这样的,如果存在则更新,如果不存在则新增,那么假设一个IndexWriter中存在,另一个不存在,如果两个都应用更新,那么最后的结果很简单就是存在两份一样的document了。
个人理解优雅的程序应该是简单的,越是简单的程序其实越是健壮,这种方案实现起来很复杂,即使这么复杂的程序实现了,其健壮性仍然值得担忧,所以不是特别推荐使用这种方案。比较好的方案当然是上Elasticsearch集群了,Elasticsearch的刷新时间默认是1s钟,也就是说,最迟1s之后,就可以看到新的数据。当然如果非要针对Lucene进行开发,可以参考Linked公司开源的Zoie搜索引擎。