[ 登录注册 ]

语言

企业搜索引擎开发之连接器connector(二十八)

2017-07-14 09:41:22 admin 返回上一页

标签:style   class   blog   code   http   tar   

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的

DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义的方法规范

/*** Management interface to {@link DocumentSnapshotRepositoryMonitor} threads.** @since 2.8*/public interface DocumentSnapshotRepositoryMonitorManager {/*** Ensures all monitor threads are running.** @param checkpoint for the last completed document or null if none have*        been completed.* @throws RepositoryException*/void start(String checkpoint) throws RepositoryException;/*** Stops all the configured {@link DocumentSnapshotRepositoryMonitor} threads.*/void stop();/*** Removes persisted state for {@link DocumentSnapshotRepositoryMonitor}* threads. After calling this {@link DocumentSnapshotRepositoryMonitor}* threads will no longer be able to resume from where they left off last* time.*/void clean();/*** Returns the number of {@link DocumentSnapshotRepositoryMonitor} threads* that are alive. This method is for testing purposes.*/int getThreadCount();/*** Returns the {@link CheckpointAndChangeQueue} for this* {@link DocumentSnapshotRepositoryMonitorManager}*/CheckpointAndChangeQueue getCheckpointAndChangeQueue();/** Returns whether we are after a start() call and before a stop(). */boolean isRunning();/*** Receives information specifying what is guaranteed to be delivered to GSA.* Every entry in passed in Map is a monitor name and MonitorCheckpoint.* The monitor of that name can expect that all documents before and including* document related with MonitorCheckpoint will be delivered to GSA.* This information is for the convenience and efficiency of the Monitor so* that it knows how many changes it has to resend.  It‘s valid for a monitor* to ignore these updates if it feels like it for some good reason.* FileConnectorSystemMonitor instances use this information to trim their* file system snapshots.*/void acceptGuarantees(Map<String MonitorCheckpoint> guarantees);/*** Receives {@link TraversalSchedule} from TraversalManager which is* {@link TraversalScheduleAware}.*/void setTraversalSchedule(TraversalSchedule traversalSchedule);}

然后再来看DocumentSnapshotRepositoryMonitorManagerImpl类怎么实现上述接口中定义的行为

先来了解相关属性及如何初始化它们的

private volatile TraversalSchedule traversalSchedule;//监控器线程private final List<Thread> threads =Collections.synchronizedList(new ArrayList<Thread>());//监控器映射容器private final Map<String DocumentSnapshotRepositoryMonitor> fileSystemMonitorsByName =Collections.synchronizedMap(new HashMap<String DocumentSnapshotRepositoryMonitor>());private boolean isRunning = false;  // Monitor threads start in off state.private final List<? extends SnapshotRepository<? extends DocumentSnapshot>>repositories;private final File snapshotDir;private final ChecksumGenerator checksumGenerator;//CheckpointAndChange对象容器(List)private final CheckpointAndChangeQueue checkpointAndChangeQueue;//Change对象容器(阻塞队列)private final ChangeQueue changeQueue;private final DocumentSnapshotFactory documentSnapshotFactory;/*** Constructs {@link DocumentSnapshotRepositoryMonitorManagerImpl}* for the {@link DiffingConnector}.** @param repositories a {@code List} of {@link SnapshotRepository*        SnapshotRepositorys}* @param documentSnapshotFactory a {@link DocumentSnapshotFactory}* @param snapshotDir directory to store {@link SnapshotRepository}* @param checksumGenerator a {@link ChecksumGenerator} used to*        detect changes in a document‘s content* @param changeQueue a {@link ChangeQueue}* @param checkpointAndChangeQueue a*        {@link CheckpointAndChangeQueue}*/public DocumentSnapshotRepositoryMonitorManagerImpl(List<? extends SnapshotRepository<? extends DocumentSnapshot>> repositoriesDocumentSnapshotFactory documentSnapshotFactoryFile snapshotDir ChecksumGenerator checksumGeneratorChangeQueue changeQueueCheckpointAndChangeQueue checkpointAndChangeQueue) {this.repositories = repositories;this.documentSnapshotFactory = documentSnapshotFactory;this.snapshotDir = snapshotDir;this.checksumGenerator = checksumGenerator;this.changeQueue = changeQueue;this.checkpointAndChangeQueue = checkpointAndChangeQueue;}

下面我们再来看它的start方法,在该方法中,主要动作为分别为调用checkpointAndChangeQueue对象的start方法,初始化各个仓库对象相关联的快照存储对象SnapshotStore,最后是启动各个仓库对象的监控器实例

/*** 启动方法*//** Go from "cold" to "warm" including CheckpointAndChangeQueue. */public void start(String connectorManagerCheckpoint)throws RepositoryException {try {//启动 获取Change(主要动作:从json格式队列文件加载monitorPoints和checkpointAndChangeList队列)      checkpointAndChangeQueue.start(connectorManagerCheckpoint);} catch (IOException e) {throw new RepositoryException("Failed starting CheckpointAndChangeQueue."e);}//MonitorCheckpoint容器Map<String MonitorCheckpoint> monitorPoints= checkpointAndChangeQueue.getMonitorRestartPoints();Map<String SnapshotStore> snapshotStores = null;//加载monitorName与SnapshotStore映射容器try {snapshotStores =recoverSnapshotStores(connectorManagerCheckpoint monitorPoints);} catch (SnapshotStoreException e) {throw new RepositoryException("Snapshot recovery failed." e);} catch (IOException e) {throw new RepositoryException("Snapshot recovery failed." e);} catch (InterruptedException e) {throw new RepositoryException("Snapshot recovery interrupted." e);}//启动监控线程    startMonitorThreads(snapshotStores monitorPoints);isRunning = true;}

在初始化每个仓库对象的快照存储对象SnapshotStore时,同时传入相关联的MonitorCheckPoint对象实例,必要时修复快照文件

 /* For each start path gets its monitor recovery files in state were monitor* can be started. *//*** 加载monitorName与SnapshotStore映射容器* @param connectorManagerCheckpoint* @param monitorPoints* @return* @throws IOException* @throws SnapshotStoreException* @throws InterruptedException*/private Map<String SnapshotStore> recoverSnapshotStores(String connectorManagerCheckpoint Map<StringMonitorCheckpoint> monitorPoints)throws IOException SnapshotStoreException InterruptedException {Map<String SnapshotStore> snapshotStores =new HashMap<String SnapshotStore>();for (SnapshotRepository<? extends DocumentSnapshot> repository: repositories) {String monitorName = makeMonitorNameFromStartPath(repository.getName());File dir = new File(snapshotDir  monitorName);boolean startEmpty = (connectorManagerCheckpoint == null)|| (!monitorPoints.containsKey(monitorName));if (startEmpty) {LOG.info("Deleting " + repository.getName()+ " global checkpoint=" + connectorManagerCheckpoint+ " monitor checkpoint=" + monitorPoints.get(monitorName));//删除该快照目录        delete(dir);} else {//修复该快照目录        SnapshotStore.stitch(dir monitorPoints.get(monitorName)documentSnapshotFactory);}SnapshotStore snapshotStore = new SnapshotStore(dirdocumentSnapshotFactory);snapshotStores.put(monitorName snapshotStore);}return snapshotStores;}

下面继续跟踪启动监控器线程的方法

 /*** 启动监控线程(貌似MonitorCheckpoint与SnapshotStore与monitor有映射关系)* Creates a {@link DocumentSnapshotRepositoryMonitor} thread for each* startPath.** @throws RepositoryDocumentException if any of the threads cannot be*         started.*/private void startMonitorThreads(Map<String SnapshotStore> snapshotStoresMap<String MonitorCheckpoint> monitorPoints)throws RepositoryDocumentException {for (SnapshotRepository<? extends DocumentSnapshot> repository: repositories) {String monitorName = makeMonitorNameFromStartPath(repository.getName());//monitorName snapshotStores映射//快照存储器(读写器)SnapshotStore snapshotStore = snapshotStores.get(monitorName);//创建监控线程Thread monitorThread = newMonitorThread(repository snapshotStoremonitorPoints.get(monitorName));threads.add(monitorThread);LOG.info("starting monitor for <" + repository.getName() + ">");monitorThread.setName(repository.getName());monitorThread.setDaemon(true);monitorThread.start();}}

监控器对象的创建在下面的方法

/*** 创建监控线程* Creates a {@link DocumentSnapshotRepositoryMonitor} thread for the provided* folder.** @throws RepositoryDocumentException if {@code startPath} is not readable*         or if there is any problem reading or writing snapshots.*/private Thread newMonitorThread(SnapshotRepository<? extends DocumentSnapshot> repositorySnapshotStore snapshotStore MonitorCheckpoint startCp)throws RepositoryDocumentException {//注意monitorNameString monitorName = makeMonitorNameFromStartPath(repository.getName());//document在监控线程里面处理DocumentSnapshotRepositoryMonitor monitor =new DocumentSnapshotRepositoryMonitor(monitorName repositorysnapshotStore changeQueue.newCallback() DOCUMENT_SINK startCpdocumentSnapshotFactory);monitor.setTraversalSchedule(traversalSchedule);LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor);fileSystemMonitorsByName.put(monitorName monitor);return new Thread(monitor);}

stop方法实现监控器线程的停止

/*** 停止监控器*/private void flagAllMonitorsToStop() {for (SnapshotRepository<? extends DocumentSnapshot> repository: repositories) {String monitorName = makeMonitorNameFromStartPath(repository.getName());DocumentSnapshotRepositoryMonitormonitor = fileSystemMonitorsByName.get(monitorName);if (null != monitor) {monitor.shutdown();}else {LOG.fine("Unable to stop non existent monitor thread for "+ monitorName);}}}/*** 停止监控器线程*//* @Override */public synchronized void stop() {for (Thread thread : threads) {thread.interrupt();}for (Thread thread : threads) {try {thread.join(MAX_SHUTDOWN_MS);if (thread.isAlive()) {LOG.warning("failed to stop background thread: " + thread.getName());}} catch (InterruptedException e) {// Mark this thread as interrupted so it can be dealt with later.        Thread.currentThread().interrupt();}}threads.clear();/* in case thread.interrupt doesn‘t stop monitors */flagAllMonitorsToStop();fileSystemMonitorsByName.clear();changeQueue.clear();this.isRunning = false;}

在flagAllMonitorsToStop()方法中调用监控器对象的monitor.shutdown()方法,设置监控器对象 的标识属性

 /* The monitor should exit voluntarily if set to false */private volatile boolean isRunning = true;

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: [email protected]#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789613.html 

企业搜索引擎开发之连接器connector(二十八)布布扣bubuko.com

企业搜索引擎开发之连接器connector(二十八)

标签:style   class   blog   code   http   tar   


文章来源:http://www.bozhiyue.com/yuyan/2017/0714/1484240.html
返回上一页    返回分类 上一篇:   下一篇:
相关