[ 登录注册 ]

语言

企业搜索引擎开发之连接器connector(二十七)

2017-07-14 10:28:20 admin 返回上一页

标签:des   style   class   blog   code   http   

ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法

 * A source of [email protected] Change} objects.** @since 2.8*/public interface ChangeSource {/*** @return the next change or {@code null} if there is no change available*/public Change getNextChange();}

在ChangeQueue类实例里面初始化阻塞队列private final BlockingQueue<Change> pendingChanges,作为保存Change对象容器

/*** 初始化阻塞队列pendingChanges* @param size* @param sleepInterval* @param introduceDelayAfterEachScan* @param activityLogger*/private ChangeQueue(int size long sleepIntervalboolean introduceDelayAfterEachScan CrawlActivityLogger activityLogger) {pendingChanges = new ArrayBlockingQueue<Change>(size);this.sleepInterval = sleepInterval;this.activityLogger = activityLogger;this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;}

参数introduceDelayAfterEveryScan设置在数据迭代完毕是否延时

上文中提到在其内部类CallBack中将提交的数据添加到阻塞队列BlockingQueue<Change> pendingChanges之中

而在ChangeQueue实现ChangeSource接口的方法中,实现从阻塞队列获取Change对象

/*** 获取阻塞队列pendingChanges元素* Gets the next available change from the ChangeQueue.  Will wait up to* 1/4 second for a change to appear if none is immediately available.** @return the next available change or {@code null} if no changes are*         available*/public Change getNextChange() {try {return pendingChanges.poll(250L TimeUnit.MILLISECONDS);} catch (InterruptedException ie) {return null;}}

ChangeQueue对象作为保存Change对象的缓冲容器,上文中分析到Change对象是通过启动监控器对象DocumentSnapshotRepositoryMonitor的线程方法添加进来的

那么,由哪个对象实现调用ChangeQueue对象的getNextChange()方法取出Change对象数据呢?

通过跟踪CheckpointAndChangeQueue类的loadUpFromChangeSource方法调用了getNextChange()方法,在该方法里面将获取的Chnage对象经过包装为CheckpointAndChange类型对象后添加到成员属性List<CheckpointAndChange> checkpointAndChangeList之中

先熟悉一下相关成员属性和构造函数

 private final AtomicInteger maximumQueueSize =new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);private final List<CheckpointAndChange> checkpointAndChangeList;private final ChangeSource changeSource;private final DocumentHandleFactory internalDocumentHandleFactory;private final DocumentHandleFactory clientDocumentHandleFactory;private volatile DiffingConnectorCheckpoint lastCheckpoint;private final File persistDir;  // place to persist enqueued valuesprivate MonitorRestartState monitorPoints = new MonitorRestartState();public CheckpointAndChangeQueue(ChangeSource changeSource File persistDirDocumentHandleFactory internalDocumentHandleFactoryDocumentHandleFactory clientDocumentHandleFactory) {this.changeSource = changeSource;this.checkpointAndChangeList= Collections.synchronizedList(new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));this.persistDir = persistDir;this.internalDocumentHandleFactory = internalDocumentHandleFactory;this.clientDocumentHandleFactory = clientDocumentHandleFactory;ensurePersistDirExists();}

包括初始化ChangeSource类型对象changeSource(也即ChangeQueue类型对象)以及List容器List<CheckpointAndChange> checkpointAndChangeList

再来回顾loadUpFromChangeSource方法

 /*** 从ChangeSource拉取Change,加入checkpointAndChangeList*/private void loadUpFromChangeSource() {int max = maximumQueueSize.get();if (checkpointAndChangeList.size() < max) {lastCheckpoint = lastCheckpoint.nextMajor();}   while (checkpointAndChangeList.size() < max) {Change newChange = changeSource.getNextChange();if (newChange == null) {break;}lastCheckpoint = lastCheckpoint.next();checkpointAndChangeList.add(new CheckpointAndChange(lastCheckpoint newChange));          }}

方法主要行为即从changeSource对象取出Change对象,然后经过包装为CheckPointAndChange对象添加到 容器List<CheckpointAndChange> checkpointAndChangeList之中

在其resume方法里面调用了loadUpFromChangeSource方法(resume方法在DiffingConnectorDocumentList类的构造函数中调用)

/*** 获取List<CheckpointAndChange>队列* Returns an {@link Iterator} for currently available* {@link CheckpointAndChange} objects that occur after the passed in* checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}* passed in is produced by calling* {@link DiffingConnectorCheckpoint#toString()}. As a side effect Objects* up to and including the object with the passed in checkpoint are removed* from this queue.** @param checkpointString null means return all {@link CheckpointAndChange}*        objects and a non null value means to return*        {@link CheckpointAndChange} objects with checkpoints after the*        passed in value.* @throws IOException if error occurs while manipulating recovery state*/synchronized List<CheckpointAndChange> resume(String checkpointString)throws IOException {//移除已完成队列    removeCompletedChanges(checkpointString);//从ChangeSource拉取Change,加入checkpointAndChangeList    loadUpFromChangeSource();//更新monitorPoints    monitorPoints.updateOnGuaranteed(checkpointAndChangeList);try {//持久化checkpointAndChangeList到队列文件//一次resume即生成一文件      writeRecoveryState();} finally {// TODO: Enahnce with mechanism that remembers// information about recovery files to avoid re-reading.//移除冗余的队列文件 (已经消费完成的)      removeExcessRecoveryState();}return getList();}

在填充List<CheckpointAndChange> checkpointAndChangeList容器后,将其中的数据以json格式持久化到队列文件 

/*** 持久化json队列* @throws IOException*/private void writeRecoveryState() throws IOException {// TODO(pjo): Move this method into RecoveryFile.File recoveryFile = new RecoveryFile(persistDir);FileOutputStream outStream = new FileOutputStream(recoveryFile);Writer writer = new OutputStreamWriter(outStream Charsets.UTF_8);try {try {writeJson(writer);} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed writing recovery file." e);}writer.flush();outStream.getFD().sync();} finally {writer.close();}}

队列文件命名包含了当前系统时间,用于比较文件创建的早晚

/*** 可用于比较时间的队列文件* A File that has some of the recovery logic.*  Original recovery files‘ names contained a single nanosecond timestamp*  eg.  recovery.10220010065599398 .  These turned out to be flawed*  because nanosecond times can go "back in time" between JVM restarts.*  Updated recovery files‘ names contain a wall clock millis timestamp*  followed by an underscore followed by a nanotimestamp eg.*  recovery.702522216012_10220010065599398 .*/static class RecoveryFile extends File {final static long NO_TIME_AVAIL = -1;long milliTimestamp = NO_TIME_AVAIL;long nanoTimestamp;long parseTime(String s) throws IOException {try {return Long.parseLong(s);} catch(NumberFormatException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}/*** 解析文件名称中包含的时间* @throws IOException*/void parseOutTimes() throws IOException {try {String basename = getName();if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());} else {String extension = basename.substring(RECOVERY_FILE_PREFIX.length());if (!extension.contains("_")) {  // Original name format.nanoTimestamp = parseTime(extension);} else {  // Updated name format.String timeParts[] = extension.split("_");if (2 != timeParts.length) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}milliTimestamp = parseTime(timeParts[0]);nanoTimestamp = parseTime(timeParts[1]);}}} catch(IndexOutOfBoundsException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}RecoveryFile(File persistanceDir) throws IOException {super(persistanceDir RECOVERY_FILE_PREFIX + System.currentTimeMillis()+ "_" + System.nanoTime());parseOutTimes();}/*** 该构造函数用于先获得文件绝对路径* @param absolutePath* @throws IOException*/RecoveryFile(String absolutePath) throws IOException {super(absolutePath);parseOutTimes();}boolean isOlder(RecoveryFile other) {boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;boolean bothHaveMillis = weHaveMillis && otherHasMillis;boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);if (bothHaveMillis) {if (this.milliTimestamp < other.milliTimestamp) {return true;} else if (this.milliTimestamp > other.milliTimestamp) {return false;} else {return this.nanoTimestamp < other.nanoTimestamp;}} else if (neitherHasMillis) {return this.nanoTimestamp < other.nanoTimestamp;} else if (weHaveMillis) {  // and other doesn‘t; we are newer.return false;} else {  // other has millis; other is newer.return true;}}/** A delete method that logs failures. *//*** 删除文件*/public void logOnFailDelete() {boolean deleted = super.delete();if (!deleted) {LOG.severe("Failed to delete: " + getAbsolutePath());}}// TODO(pjo): Move more recovery logic into this class.}

下面来看在其启动方法(start方法)都做了什么

 /*** Initialize to start processing from after the passed in checkpoint* or from the beginning if the passed in checkpoint is null.  Part of* making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".*/public synchronized void start(String checkpointString) throws IOException {LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);//创建队列目录    ensurePersistDirExists();checkpointAndChangeList.clear();lastCheckpoint = constructLastCheckpoint(checkpointString);if (null == checkpointString) {//删除队列文件      removeAllRecoveryState();} else {RecoveryFile current = removeExcessRecoveryState();//加载monitorPoints和checkpointAndChangeList队列      loadUpFromRecoveryState(current);//this.monitorPoints.points.entrySet();}}

无非从原先保存的队列文件中加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器中(另外还包括MonitorCheckoint对象)

/*** 加载队列* @param file* @throws IOException*/private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {// TODO(pjo): Move this method into RecoveryFile.new LoadingQueueReader().readJson(file);}

在CheckpointAndChangeQueue类中定义了内部类,即用于从json格式文件加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器

抽象队列读取抽象类AbstractQueueReader

/*** 从json文件加载队列抽象类* Reads JSON recovery files. Uses the Template Method pattern to* delegate what to do with the parsed objects to subclasses.** Note: This class uses gson for streaming support.*/private abstract class AbstractQueueReader {public void readJson(File file) throws IOException {readJson(new BufferedReader(new InputStreamReader(new FileInputStream(file) Charsets.UTF_8)));}/*** Reads and parses the stream calling the abstract methods to* take whatever action is required. The given stream will be* closed automatically.** @param reader the stream to parse*/@VisibleForTestingvoid readJson(Reader reader) throws IOException {JsonReader jsonReader = new JsonReader(reader);try {readJson(jsonReader);} finally {jsonReader.close();}}/*** Reads and parses the stream calling the abstract methods to* take whatever action is required.*/private void readJson(JsonReader reader) throws IOException {JsonParser parser = new JsonParser();reader.beginObject();while (reader.hasNext()) {String name = reader.nextName();if (name.equals(MONITOR_STATE_JSON_TAG)) {readMonitorPoints(parser.parse(reader));} else if (name.equals(QUEUE_JSON_TAG)) {reader.beginArray();while (reader.hasNext()) {readCheckpointAndChange(parser.parse(reader));}reader.endArray();} else {throw new IOException("Read invalid recovery file.");}}reader.endObject();reader.setLenient(true);String name = reader.nextString();if (!name.equals(SENTINAL)) {throw new IOException("Read invalid recovery file.");}}protected abstract void readMonitorPoints(JsonElement gson)throws IOException;protected abstract void readCheckpointAndChange(JsonElement gson)throws IOException;}

抽象方法由子类实现

/*** 检测队列文件的有效性* Verifies that a JSON recovery file is valid JSON with a* trailing sentinel.*/private class ValidatingQueueReader extends AbstractQueueReader {protected void readMonitorPoints(JsonElement gson) throws IOException {}protected void readCheckpointAndChange(JsonElement gson)throws IOException {}}/*** 从json文件加载队列实现类*//** Loads the queue from a JSON recovery file. *//** TODO(jlacey): Change everything downstream to gson. For now we* reserialize the individual gson objects and deserialize them* using org.json.*/@VisibleForTestingclass LoadingQueueReader extends AbstractQueueReader {/*** 加载MonitorRestartState checkpoint(HashMap<String MonitorCheckpoint> points)*/protected void readMonitorPoints(JsonElement gson) throws IOException {try {JSONObject json = gsonToJson(gson);monitorPoints = new MonitorRestartState(json);//monitorPoints.updateOnGuaranteed(checkpointAndChangeList)} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue." e);}}/*** 加载checkpointAndChangeList*/protected void readCheckpointAndChange(JsonElement gson)throws IOException {try {JSONObject json = gsonToJson(gson);checkpointAndChangeList.add(new CheckpointAndChange(jsoninternalDocumentHandleFactory clientDocumentHandleFactory));} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue." e);}}// TODO(jlacey): This could be much more efficient especially// with LOBs if we directly transformed the objects with a little// recursive parser. This code is only used when recovering failed// batches so I don‘t know if that‘s worth the effort.private JSONObject gsonToJson(JsonElement gson) throws JSONException {return new JSONObject(gson.toString());}}

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: [email protected]#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789560.html 

企业搜索引擎开发之连接器connector(二十七)布布扣bubuko.com

企业搜索引擎开发之连接器connector(二十七)

标签:des   style   class   blog   code   http   


文章来源:http://www.bozhiyue.com/yuyan/2017/0714/1484262.html
返回上一页    返回分类 上一篇:   下一篇:
相关