| import contextlib |
| import ctypes |
| import os |
|
|
| from ctypes.wintypes import ( |
| BOOL, |
| CHAR, |
| DWORD, |
| HANDLE, |
| LONG, |
| LPWSTR, |
| MAX_PATH, |
| PDWORD, |
| ULONG, |
| ) |
|
|
| from shellingham._core import SHELL_NAMES |
|
|
|
|
| INVALID_HANDLE_VALUE = HANDLE(-1).value |
| ERROR_NO_MORE_FILES = 18 |
| ERROR_INSUFFICIENT_BUFFER = 122 |
| TH32CS_SNAPPROCESS = 2 |
| PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 |
|
|
|
|
| kernel32 = ctypes.windll.kernel32 |
|
|
|
|
| def _check_handle(error_val=0): |
| def check(ret, func, args): |
| if ret == error_val: |
| raise ctypes.WinError() |
| return ret |
|
|
| return check |
|
|
|
|
| def _check_expected(expected): |
| def check(ret, func, args): |
| if ret: |
| return True |
| code = ctypes.GetLastError() |
| if code == expected: |
| return False |
| raise ctypes.WinError(code) |
|
|
| return check |
|
|
|
|
| class ProcessEntry32(ctypes.Structure): |
| _fields_ = ( |
| ("dwSize", DWORD), |
| ("cntUsage", DWORD), |
| ("th32ProcessID", DWORD), |
| ("th32DefaultHeapID", ctypes.POINTER(ULONG)), |
| ("th32ModuleID", DWORD), |
| ("cntThreads", DWORD), |
| ("th32ParentProcessID", DWORD), |
| ("pcPriClassBase", LONG), |
| ("dwFlags", DWORD), |
| ("szExeFile", CHAR * MAX_PATH), |
| ) |
|
|
|
|
| kernel32.CloseHandle.argtypes = [HANDLE] |
| kernel32.CloseHandle.restype = BOOL |
|
|
| kernel32.CreateToolhelp32Snapshot.argtypes = [DWORD, DWORD] |
| kernel32.CreateToolhelp32Snapshot.restype = HANDLE |
| kernel32.CreateToolhelp32Snapshot.errcheck = _check_handle( |
| INVALID_HANDLE_VALUE, |
| ) |
|
|
| kernel32.Process32First.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)] |
| kernel32.Process32First.restype = BOOL |
| kernel32.Process32First.errcheck = _check_expected( |
| ERROR_NO_MORE_FILES, |
| ) |
|
|
| kernel32.Process32Next.argtypes = [HANDLE, ctypes.POINTER(ProcessEntry32)] |
| kernel32.Process32Next.restype = BOOL |
| kernel32.Process32Next.errcheck = _check_expected( |
| ERROR_NO_MORE_FILES, |
| ) |
|
|
| kernel32.GetCurrentProcessId.argtypes = [] |
| kernel32.GetCurrentProcessId.restype = DWORD |
|
|
| kernel32.OpenProcess.argtypes = [DWORD, BOOL, DWORD] |
| kernel32.OpenProcess.restype = HANDLE |
| kernel32.OpenProcess.errcheck = _check_handle( |
| INVALID_HANDLE_VALUE, |
| ) |
|
|
| kernel32.QueryFullProcessImageNameW.argtypes = [HANDLE, DWORD, LPWSTR, PDWORD] |
| kernel32.QueryFullProcessImageNameW.restype = BOOL |
| kernel32.QueryFullProcessImageNameW.errcheck = _check_expected( |
| ERROR_INSUFFICIENT_BUFFER, |
| ) |
|
|
|
|
| @contextlib.contextmanager |
| def _handle(f, *args, **kwargs): |
| handle = f(*args, **kwargs) |
| try: |
| yield handle |
| finally: |
| kernel32.CloseHandle(handle) |
|
|
|
|
| def _iter_processes(): |
| f = kernel32.CreateToolhelp32Snapshot |
| with _handle(f, TH32CS_SNAPPROCESS, 0) as snap: |
| entry = ProcessEntry32() |
| entry.dwSize = ctypes.sizeof(entry) |
| ret = kernel32.Process32First(snap, entry) |
| while ret: |
| yield entry |
| ret = kernel32.Process32Next(snap, entry) |
|
|
|
|
| def _get_full_path(proch): |
| size = DWORD(MAX_PATH) |
| while True: |
| path_buff = ctypes.create_unicode_buffer("", size.value) |
| if kernel32.QueryFullProcessImageNameW(proch, 0, path_buff, size): |
| return path_buff.value |
| size.value *= 2 |
|
|
|
|
| def get_shell(pid=None, max_depth=10): |
| proc_map = { |
| proc.th32ProcessID: (proc.th32ParentProcessID, proc.szExeFile) |
| for proc in _iter_processes() |
| } |
| pid = pid or os.getpid() |
|
|
| for _ in range(0, max_depth + 1): |
| try: |
| ppid, executable = proc_map[pid] |
| except KeyError: |
| break |
|
|
| |
| |
| |
| |
| |
| |
| if isinstance(executable, bytes): |
| executable = executable.decode("mbcs", "replace") |
|
|
| name = executable.rpartition(".")[0].lower() |
| if name not in SHELL_NAMES: |
| pid = ppid |
| continue |
|
|
| key = PROCESS_QUERY_LIMITED_INFORMATION |
| with _handle(kernel32.OpenProcess, key, 0, pid) as proch: |
| return (name, _get_full_path(proch)) |
|
|
| return None |
|
|