anshdadhich commited on
Commit
deeaf27
·
verified ·
1 Parent(s): 84b3b1c

Upload src/mft/watcher.rs

Browse files
Files changed (1) hide show
  1. src/mft/watcher.rs +236 -0
src/mft/watcher.rs ADDED
@@ -0,0 +1,236 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ use std::mem;
2
+ use std::time::Duration;
3
+ use crossbeam_channel::Sender;
4
+ use windows::{
5
+ core::PCWSTR,
6
+ Win32::Foundation::HANDLE,
7
+ Win32::Storage::FileSystem::{
8
+ CreateFileW, FILE_FLAG_BACKUP_SEMANTICS, FILE_SHARE_DELETE,
9
+ FILE_SHARE_READ, FILE_SHARE_WRITE, OPEN_EXISTING,
10
+ },
11
+ Win32::System::Ioctl::{
12
+ FSCTL_QUERY_USN_JOURNAL, FSCTL_READ_USN_JOURNAL,
13
+ READ_USN_JOURNAL_DATA_V0, USN_JOURNAL_DATA_V0, USN_RECORD_V2,
14
+ USN_REASON_FILE_CREATE, USN_REASON_FILE_DELETE,
15
+ USN_REASON_RENAME_NEW_NAME, USN_REASON_RENAME_OLD_NAME,
16
+ },
17
+ Win32::System::IO::DeviceIoControl,
18
+ };
19
+ use crate::mft::types::{FileKind, FileRecord, IndexEvent, JournalCheckpoint, NtfsDrive};
20
+
21
+ const BUFFER_SIZE: usize = 64 * 1024;
22
+
23
+ pub struct UsnWatcher {
24
+ handle: HANDLE,
25
+ drive: NtfsDrive,
26
+ sender: Sender<IndexEvent>,
27
+ pub next_usn: i64,
28
+ pub journal_id: u64,
29
+ }
30
+
31
+ impl UsnWatcher {
32
+ pub fn new(
33
+ drive: &NtfsDrive,
34
+ sender: Sender<IndexEvent>,
35
+ ) -> windows::core::Result<Self> {
36
+ Self::new_from(drive, sender, None)
37
+ }
38
+
39
+ pub fn new_from(
40
+ drive: &NtfsDrive,
41
+ sender: Sender<IndexEvent>,
42
+ checkpoint: Option<&JournalCheckpoint>,
43
+ ) -> windows::core::Result<Self> {
44
+ let path: Vec<u16> = drive.device_path.encode_utf16().chain(Some(0)).collect();
45
+
46
+ let handle = unsafe {
47
+ CreateFileW(
48
+ PCWSTR(path.as_ptr()),
49
+ 0x0,
50
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
51
+ None,
52
+ OPEN_EXISTING,
53
+ FILE_FLAG_BACKUP_SEMANTICS,
54
+ None,
55
+ )?
56
+ };
57
+
58
+ let mut journal_data: USN_JOURNAL_DATA_V0 = unsafe { mem::zeroed() };
59
+ let mut bytes_returned = 0u32;
60
+
61
+ unsafe {
62
+ DeviceIoControl(
63
+ handle,
64
+ FSCTL_QUERY_USN_JOURNAL,
65
+ None, 0,
66
+ Some(&mut journal_data as *mut _ as *mut _),
67
+ mem::size_of::<USN_JOURNAL_DATA_V0>() as u32,
68
+ Some(&mut bytes_returned),
69
+ None,
70
+ )?;
71
+ }
72
+
73
+ // If checkpoint matches current journal and USN is still in range, resume.
74
+ // Otherwise start from current position (don't replay old history).
75
+ let next_usn = if let Some(cp) = checkpoint {
76
+ if cp.journal_id != journal_data.UsnJournalID {
77
+ return Err(windows::core::Error::new(
78
+ windows::Win32::Foundation::ERROR_JOURNAL_NOT_ACTIVE.into(),
79
+ "Journal ID mismatch — rescan needed",
80
+ ));
81
+ }
82
+ if cp.next_usn < journal_data.FirstUsn || cp.next_usn > journal_data.NextUsn {
83
+ return Err(windows::core::Error::new(
84
+ windows::Win32::Foundation::ERROR_JOURNAL_NOT_ACTIVE.into(),
85
+ "Saved USN outside journal range — rescan needed",
86
+ ));
87
+ }
88
+ cp.next_usn
89
+ } else {
90
+ journal_data.NextUsn
91
+ };
92
+
93
+ Ok(Self {
94
+ handle,
95
+ drive: drive.clone(),
96
+ sender,
97
+ next_usn,
98
+ journal_id: journal_data.UsnJournalID,
99
+ })
100
+ }
101
+
102
+ pub fn checkpoint(&self) -> JournalCheckpoint {
103
+ JournalCheckpoint {
104
+ next_usn: self.next_usn,
105
+ journal_id: self.journal_id,
106
+ drive_letter: self.drive.letter,
107
+ }
108
+ }
109
+
110
+ pub fn run(&mut self) {
111
+ let mut buffer = vec![0u8; BUFFER_SIZE];
112
+ loop {
113
+ std::thread::sleep(Duration::from_millis(500));
114
+ self.poll(&mut buffer);
115
+ }
116
+ }
117
+
118
+ pub fn run_shared(&mut self, shared: std::sync::Arc<parking_lot::Mutex<Vec<JournalCheckpoint>>>) {
119
+ let mut buffer = vec![0u8; BUFFER_SIZE];
120
+ loop {
121
+ std::thread::sleep(Duration::from_millis(500));
122
+ self.poll(&mut buffer);
123
+ let mut cps = shared.lock();
124
+ cps.retain(|c| c.drive_letter != self.drive.letter);
125
+ cps.push(self.checkpoint());
126
+ }
127
+ }
128
+
129
+ /// Drain all pending journal entries — used for delta catch-up on startup
130
+ pub fn drain(&mut self) -> usize {
131
+ let mut buffer = vec![0u8; BUFFER_SIZE];
132
+ let mut count = 0;
133
+ loop {
134
+ let before = self.next_usn;
135
+ self.poll(&mut buffer);
136
+ if self.next_usn == before {
137
+ break;
138
+ }
139
+ count += 1;
140
+ }
141
+ count
142
+ }
143
+
144
+ fn poll(&mut self, buffer: &mut Vec<u8>) {
145
+ let read_data = READ_USN_JOURNAL_DATA_V0 {
146
+ StartUsn: self.next_usn,
147
+ ReasonMask: USN_REASON_FILE_CREATE
148
+ | USN_REASON_FILE_DELETE
149
+ | USN_REASON_RENAME_NEW_NAME
150
+ | USN_REASON_RENAME_OLD_NAME,
151
+ ReturnOnlyOnClose: 0,
152
+ Timeout: 0,
153
+ BytesToWaitFor: 0,
154
+ UsnJournalID: self.journal_id,
155
+ };
156
+
157
+ let mut bytes_returned = 0u32;
158
+ let ok = unsafe {
159
+ DeviceIoControl(
160
+ self.handle,
161
+ FSCTL_READ_USN_JOURNAL,
162
+ Some(&read_data as *const _ as *const _),
163
+ mem::size_of::<READ_USN_JOURNAL_DATA_V0>() as u32,
164
+ Some(buffer.as_mut_ptr() as *mut _),
165
+ BUFFER_SIZE as u32,
166
+ Some(&mut bytes_returned),
167
+ None,
168
+ )
169
+ };
170
+
171
+ if ok.is_err() || bytes_returned <= 8 {
172
+ return;
173
+ }
174
+
175
+ self.next_usn = i64::from_ne_bytes(buffer[0..8].try_into().unwrap());
176
+
177
+ let mut offset = 8usize;
178
+ while offset + mem::size_of::<USN_RECORD_V2>() <= bytes_returned as usize {
179
+ let record = unsafe {
180
+ &*(buffer.as_ptr().add(offset) as *const USN_RECORD_V2)
181
+ };
182
+ if record.RecordLength == 0 { break; }
183
+ self.process_record(record, buffer, offset);
184
+ offset += record.RecordLength as usize;
185
+ }
186
+ }
187
+
188
+ fn process_record(&self, record: &USN_RECORD_V2, buffer: &[u8], offset: usize) {
189
+ let name_offset = record.FileNameOffset as usize;
190
+ let name_len = record.FileNameLength as usize / 2;
191
+ let name_ptr = unsafe {
192
+ buffer.as_ptr().add(offset + name_offset) as *const u16
193
+ };
194
+ let name_slice = unsafe { std::slice::from_raw_parts(name_ptr, name_len) };
195
+ let name = String::from_utf16_lossy(name_slice);
196
+
197
+ let is_dir = (record.FileAttributes & 0x10) != 0;
198
+ let file_ref = record.FileReferenceNumber as u64;
199
+ let parent_ref = record.ParentFileReferenceNumber as u64;
200
+ let reason = record.Reason;
201
+
202
+ if reason & USN_REASON_FILE_DELETE != 0 {
203
+ let _ = self.sender.send(IndexEvent::Deleted(file_ref));
204
+ return;
205
+ }
206
+
207
+ let kind = if is_dir { FileKind::Directory } else { FileKind::File };
208
+
209
+ // Rename new name = could be a rename OR a move to different folder
210
+ if reason & USN_REASON_RENAME_NEW_NAME != 0 {
211
+ let _ = self.sender.send(IndexEvent::Moved {
212
+ file_ref,
213
+ new_parent_ref: parent_ref,
214
+ name: name.clone(),
215
+ kind: kind.clone(),
216
+ });
217
+ return;
218
+ }
219
+
220
+ if reason & USN_REASON_FILE_CREATE != 0 {
221
+ let new_record = FileRecord {
222
+ file_ref,
223
+ parent_ref,
224
+ name,
225
+ kind,
226
+ };
227
+ let _ = self.sender.send(IndexEvent::Created(new_record));
228
+ }
229
+ }
230
+ }
231
+
232
+ impl Drop for UsnWatcher {
233
+ fn drop(&mut self) {
234
+ unsafe { windows::Win32::Foundation::CloseHandle(self.handle).ok() };
235
+ }
236
+ }