anshdadhich commited on
Commit
60a0d5a
·
verified ·
1 Parent(s): e0ba7be

Upload fastsearch-core/src/mft/watcher.rs

Browse files
Files changed (1) hide show
  1. fastsearch-core/src/mft/watcher.rs +232 -0
fastsearch-core/src/mft/watcher.rs ADDED
@@ -0,0 +1,232 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ use std::mem;
2
+ use std::time::Duration;
3
+ use crossbeam_channel::Sender;
4
+ use windows::{
5
+ core::PCWSTR,
6
+ Win32::Foundation::HANDLE,
7
+ Win32::Storage::FileSystem::{
8
+ CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_SHARE_DELETE,
9
+ FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING,
10
+ },
11
+ Win32::System::Ioctl::{
12
+ FSCTL_QUERY_USN_JOURNAL, FSCTL_READ_USN_JOURNAL,
13
+ READ_USN_JOURNAL_DATA_V0, USN_JOURNAL_DATA_V0, USN_RECORD_V2,
14
+ USN_REASON_FILE_CREATE, USN_REASON_FILE_DELETE,
15
+ USN_REASON_RENAME_NEW_NAME, USN_REASON_RENAME_OLD_NAME,
16
+ },
17
+ Win32::System::IO::DeviceIoControl,
18
+ };
19
+ use crate::mft::types::{FileKind, FileRecord, IndexEvent, JournalCheckpoint, NtfsDrive};
20
+
21
+ const BUFFER_SIZE: usize = 64 * 1024;
22
+
23
+ pub struct UsnWatcher {
24
+ handle: HANDLE,
25
+ drive: NtfsDrive,
26
+ sender: Sender<IndexEvent>,
27
+ pub next_usn: i64,
28
+ pub journal_id: u64,
29
+ }
30
+
31
+ impl UsnWatcher {
32
+ pub fn new(
33
+ drive: &NtfsDrive,
34
+ sender: Sender<IndexEvent>,
35
+ ) -> windows::core::Result<Self> {
36
+ Self::new_from(drive, sender, None)
37
+ }
38
+
39
+ pub fn new_from(
40
+ drive: &NtfsDrive,
41
+ sender: Sender<IndexEvent>,
42
+ checkpoint: Option<&JournalCheckpoint>,
43
+ ) -> windows::core::Result<Self> {
44
+ let path: Vec<u16> = drive.device_path.encode_utf16().chain(Some(0)).collect();
45
+
46
+ let handle = unsafe {
47
+ CreateFileW(
48
+ PCWSTR(path.as_ptr()),
49
+ 0x0,
50
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
51
+ None,
52
+ OPEN_EXISTING,
53
+ FILE_FLAG_BACKUP_SEMANTICS,
54
+ None,
55
+ )?
56
+ };
57
+
58
+ let mut journal_data: USN_JOURNAL_DATA_V0 = unsafe { mem::zeroed() };
59
+ let mut bytes_returned = 0u32;
60
+
61
+ unsafe {
62
+ DeviceIoControl(
63
+ handle,
64
+ FSCTL_QUERY_USN_JOURNAL,
65
+ None, 0,
66
+ Some(&mut journal_data as *mut _ as *mut _),
67
+ mem::size_of::<USN_JOURNAL_DATA_V0>() as u32,
68
+ Some(&mut bytes_returned),
69
+ None,
70
+ )?;
71
+ }
72
+
73
+ let next_usn = if let Some(cp) = checkpoint {
74
+ if cp.journal_id != journal_data.UsnJournalID {
75
+ return Err(windows::core::Error::new(
76
+ windows::Win32::Foundation::ERROR_JOURNAL_NOT_ACTIVE.into(),
77
+ "Journal ID mismatch — rescan needed",
78
+ ));
79
+ }
80
+ if cp.next_usn < journal_data.FirstUsn || cp.next_usn > journal_data.NextUsn {
81
+ return Err(windows::core::Error::new(
82
+ windows::Win32::Foundation::ERROR_JOURNAL_NOT_ACTIVE.into(),
83
+ "Saved USN outside journal range — rescan needed",
84
+ ));
85
+ }
86
+ cp.next_usn
87
+ } else {
88
+ journal_data.NextUsn
89
+ };
90
+
91
+ Ok(Self {
92
+ handle,
93
+ drive: drive.clone(),
94
+ sender,
95
+ next_usn,
96
+ journal_id: journal_data.UsnJournalID,
97
+ })
98
+ }
99
+
100
+ pub fn checkpoint(&self) -> JournalCheckpoint {
101
+ JournalCheckpoint {
102
+ next_usn: self.next_usn,
103
+ journal_id: self.journal_id,
104
+ drive_letter: self.drive.letter,
105
+ }
106
+ }
107
+
108
+ pub fn run(&mut self) {
109
+ let mut buffer = vec![0u8; BUFFER_SIZE];
110
+ loop {
111
+ std::thread::sleep(Duration::from_millis(500));
112
+ self.poll(&mut buffer);
113
+ }
114
+ }
115
+
116
+ pub fn run_shared(&mut self, shared: std::sync::Arc<parking_lot::Mutex<Vec<JournalCheckpoint>>>) {
117
+ let mut buffer = vec![0u8; BUFFER_SIZE];
118
+ loop {
119
+ std::thread::sleep(Duration::from_millis(500));
120
+ self.poll(&mut buffer);
121
+ let mut cps = shared.lock();
122
+ cps.retain(|c| c.drive_letter != self.drive.letter);
123
+ cps.push(self.checkpoint());
124
+ }
125
+ }
126
+
127
+ pub fn drain(&mut self) -> usize {
128
+ let mut buffer = vec![0u8; BUFFER_SIZE];
129
+ let mut count = 0;
130
+ loop {
131
+ let before = self.next_usn;
132
+ self.poll(&mut buffer);
133
+ if self.next_usn == before {
134
+ break;
135
+ }
136
+ count += 1;
137
+ }
138
+ count
139
+ }
140
+
141
+ fn poll(&mut self, buffer: &mut Vec<u8>) {
142
+ let read_data = READ_USN_JOURNAL_DATA_V0 {
143
+ StartUsn: self.next_usn,
144
+ ReasonMask: USN_REASON_FILE_CREATE
145
+ | USN_REASON_FILE_DELETE
146
+ | USN_REASON_RENAME_NEW_NAME
147
+ | USN_REASON_RENAME_OLD_NAME,
148
+ ReturnOnlyOnClose: 0,
149
+ Timeout: 0,
150
+ BytesToWaitFor: 0,
151
+ UsnJournalID: self.journal_id,
152
+ };
153
+
154
+ let mut bytes_returned = 0u32;
155
+ let ok = unsafe {
156
+ DeviceIoControl(
157
+ self.handle,
158
+ FSCTL_READ_USN_JOURNAL,
159
+ Some(&read_data as *const _ as *const _),
160
+ mem::size_of::<READ_USN_JOURNAL_DATA_V0>() as u32,
161
+ Some(buffer.as_mut_ptr() as *mut _),
162
+ BUFFER_SIZE as u32,
163
+ Some(&mut bytes_returned),
164
+ None,
165
+ )
166
+ };
167
+
168
+ if ok.is_err() || bytes_returned <= 8 {
169
+ return;
170
+ }
171
+
172
+ self.next_usn = i64::from_ne_bytes(buffer[0..8].try_into().unwrap());
173
+
174
+ let mut offset = 8usize;
175
+ while offset + mem::size_of::<USN_RECORD_V2>() <= bytes_returned as usize {
176
+ let record = unsafe {
177
+ &*(buffer.as_ptr().add(offset) as *const USN_RECORD_V2)
178
+ };
179
+ if record.RecordLength == 0 { break; }
180
+ self.process_record(record, buffer, offset);
181
+ offset += record.RecordLength as usize;
182
+ }
183
+ }
184
+
185
+ fn process_record(&self, record: &USN_RECORD_V2, buffer: &[u8], offset: usize) {
186
+ let name_offset = record.FileNameOffset as usize;
187
+ let name_len = record.FileNameLength as usize / 2;
188
+ let name_ptr = unsafe {
189
+ buffer.as_ptr().add(offset + name_offset) as *const u16
190
+ };
191
+ let name_slice = unsafe { std::slice::from_raw_parts(name_ptr, name_len) };
192
+ let name = String::from_utf16_lossy(name_slice);
193
+
194
+ let is_dir = (record.FileAttributes & 0x10) != 0;
195
+ let file_ref = record.FileReferenceNumber as u64;
196
+ let parent_ref = record.ParentFileReferenceNumber as u64;
197
+ let reason = record.Reason;
198
+
199
+ if reason & USN_REASON_FILE_DELETE != 0 {
200
+ let _ = self.sender.send(IndexEvent::Deleted(file_ref));
201
+ return;
202
+ }
203
+
204
+ let kind = if is_dir { FileKind::Directory } else { FileKind::File };
205
+
206
+ if reason & USN_REASON_RENAME_NEW_NAME != 0 {
207
+ let _ = self.sender.send(IndexEvent::Moved {
208
+ file_ref,
209
+ new_parent_ref: parent_ref,
210
+ name: name.clone(),
211
+ kind: kind.clone(),
212
+ });
213
+ return;
214
+ }
215
+
216
+ if reason & USN_REASON_FILE_CREATE != 0 {
217
+ let new_record = FileRecord {
218
+ file_ref,
219
+ parent_ref,
220
+ name,
221
+ kind,
222
+ };
223
+ let _ = self.sender.send(IndexEvent::Created(new_record));
224
+ }
225
+ }
226
+ }
227
+
228
+ impl Drop for UsnWatcher {
229
+ fn drop(&mut self) {
230
+ unsafe { windows::Win32::Foundation::CloseHandle(self.handle).ok() };
231
+ }
232
+ }