Source code for volatility.framework.automagic.pdbscan

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
"""A module for scanning translation layers looking for Windows PDB records
from loaded PE files.

This module contains a standalone scanner, and also a :class:`~volatility.framework.interfaces.layers.ScannerInterface`
based scanner for use within the framework by calling :func:`~volatility.framework.interfaces.layers.DataLayerInterface.scan`.
"""

import json
import logging
import lzma
import math
import os
import struct
from typing import Any, Dict, Generator, Iterable, List, Optional, Set, Tuple, Union
from urllib import request

from volatility import symbols
from volatility.framework import constants, exceptions, interfaces, layers
from volatility.framework.configuration import requirements
from volatility.framework.layers import intel, scanners
from volatility.framework.symbols import intermed, native
from volatility.framework.symbols.windows import pdbconv

if __name__ == "__main__":
    import sys

    sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))

vollog = logging.getLogger(__name__)

ValidKernelsType = Dict[str, Tuple[int, Dict[str, Optional[Union[bytes, str, int]]]]]
KernelsType = Iterable[Dict[str, Any]]


[docs]class PdbSignatureScanner(interfaces.layers.ScannerInterface): """A :class:`~volatility.framework.interfaces.layers.ScannerInterface` based scanner use to identify Windows PDB records. Args: pdb_names: A list of bytestrings, used to match pdb signatures against the pdb names within the records. .. note:: The pdb_names must be a list of byte strings, unicode strs will not match against the data scanned """ overlap = 0x4000 """The size of overlap needed for the signature to ensure data cannot hide between two scanned chunks""" thread_safe = True """Determines whether the scanner accesses global variables in a thread safe manner (for use with :mod:`multiprocessing`)""" _RSDS_format = struct.Struct("<16BI") def __init__(self, pdb_names: List[bytes]) -> None: super().__init__() self._pdb_names = pdb_names def __call__(self, data: bytes, data_offset: int) -> Generator[Tuple[str, Any, bytes, int], None, None]: sig = data.find(b"RSDS") while sig >= 0: null = data.find(b'\0', sig + 4 + self._RSDS_format.size) if null > -1: if (null - sig - self._RSDS_format.size) <= 100: name_offset = sig + 4 + self._RSDS_format.size pdb_name = data[name_offset:null] if pdb_name in self._pdb_names: ## this ordering is intentional due to mixed endianness in the GUID (g3, g2, g1, g0, g5, g4, g7, g6, g8, g9, ga, gb, gc, gd, ge, gf, a) = \ self._RSDS_format.unpack(data[sig + 4:name_offset]) guid = (16 * '{:02X}').format(g0, g1, g2, g3, g4, g5, g6, g7, g8, g9, ga, gb, gc, gd, ge, gf) if sig < self.chunk_size: yield (guid, a, pdb_name, data_offset + sig) sig = data.find(b"RSDS", sig + 1)
[docs]def scan(ctx: interfaces.context.ContextInterface, layer_name: str, page_size: int, progress_callback: constants.ProgressCallback = None, start: Optional[int] = None, end: Optional[int] = None) -> Generator[Dict[str, Optional[Union[bytes, str, int]]], None, None]: """Scans through `layer_name` at `ctx` looking for RSDS headers that indicate one of four common pdb kernel names (as listed in `self.pdb_names`) and returns the tuple (GUID, age, pdb_name, signature_offset, mz_offset) .. note:: This is automagical and therefore not guaranteed to provide correct results. The UI should always provide the user an opportunity to specify the appropriate types and PDB values themselves """ min_pfn = 0 pdb_names = [bytes(name + ".pdb", "utf-8") for name in constants.windows.KERNEL_MODULE_NAMES] if start is None: start = ctx.layers[layer_name].minimum_address if end is None: end = ctx.layers[layer_name].maximum_address for (GUID, age, pdb_name, signature_offset) in ctx.layers[layer_name].scan(ctx, PdbSignatureScanner(pdb_names), progress_callback = progress_callback, sections = [(start, end - start)]): mz_offset = None sig_pfn = signature_offset // page_size for i in range(sig_pfn, min_pfn, -1): if not ctx.layers[layer_name].is_valid(i * page_size, 2): break data = ctx.layers[layer_name].read(i * page_size, 2) if data == b'MZ': mz_offset = i * page_size break min_pfn = sig_pfn yield { 'GUID': GUID, 'age': age, 'pdb_name': str(pdb_name, "utf-8"), 'signature_offset': signature_offset, 'mz_offset': mz_offset }
[docs]class KernelPDBScanner(interfaces.automagic.AutomagicInterface): """Windows symbol loader based on PDB signatures. An Automagic object that looks for all Intel translation layers and scans each of them for a pdb signature. When found, a search for a corresponding Intermediate Format data file is carried out and if found an appropriate symbol space is automatically loaded. Once a specific kernel PDB signature has been found, a virtual address for the loaded kernel is determined by one of two methods. The first method assumes a specific mapping from the kernel's physical address to its virtual address (typically the kernel is loaded at its physical location plus a specific offset). The second method searches for a particular structure that lists the kernel module's virtual address, its size (not checked) and the module's name. This value is then used if one was not found using the previous method. """ priority = 30 max_pdb_size = 0x400000
[docs] def find_virtual_layers_from_req(self, context: interfaces.context.ContextInterface, config_path: str, requirement: interfaces.configuration.RequirementInterface) -> List[str]: """Traverses the requirement tree, rooted at `requirement` looking for virtual layers that might contain a windows PDB. Returns a list of possible layers Args: context: The context in which the `requirement` lives config_path: The path within the `context` for the `requirement`'s configuration variables requirement: The root of the requirement tree to search for :class:~`volatility.framework.interfaces.layers.TranslationLayerRequirement` objects to scan progress_callback: Means of providing the user with feedback during long processes Returns: A list of (layer_name, scan_results) """ sub_config_path = interfaces.configuration.path_join(config_path, requirement.name) results = [] # type: List[str] if isinstance(requirement, requirements.TranslationLayerRequirement): # Check for symbols in this layer # FIXME: optionally allow a full (slow) scan # FIXME: Determine the physical layer no matter the virtual layer virtual_layer_name = context.config.get(sub_config_path, None) layer_name = context.config.get(interfaces.configuration.path_join(sub_config_path, "memory_layer"), None) if layer_name and virtual_layer_name: memlayer = context.layers[virtual_layer_name] if isinstance(memlayer, intel.Intel): results = [virtual_layer_name] else: for subreq in requirement.requirements.values(): results += self.find_virtual_layers_from_req(context, sub_config_path, subreq) return results
[docs] def recurse_symbol_fulfiller(self, context: interfaces.context.ContextInterface, valid_kernels: ValidKernelsType, progress_callback: constants.ProgressCallback = None) -> None: """Fulfills the SymbolTableRequirements in `self._symbol_requirements` found by the `recurse_symbol_requirements`. This pass will construct any requirements that may need it in the context it was passed Args: context: Context on which to operate valid_kernels: A list of offsets where valid kernels have been found """ join = interfaces.configuration.path_join for sub_config_path, requirement in self._symbol_requirements: # TODO: Potentially think about multiple symbol requirements in both the same and different levels of the requirement tree # TODO: Consider whether a single found kernel can fulfill multiple requirements if valid_kernels: # TODO: Check that the symbols for this kernel will fulfill the requirement for virtual_layer in valid_kernels: isf_path = None _kvo, kernel = valid_kernels[virtual_layer] if not isinstance(kernel['pdb_name'], str) or not isinstance(kernel['GUID'], str): raise TypeError("PDB name or GUID not a string value") filter_string = os.path.join(kernel['pdb_name'], kernel['GUID'] + "-" + str(kernel['age'])) # Take the first result of search for the intermediate file for value in intermed.IntermediateSymbolTable.file_symbol_url("windows", filter_string): isf_path = value break else: # If none are found, attempt to download the pdb, convert it and try again self.download_pdb_isf(kernel['GUID'], kernel['age'], kernel['pdb_name'], progress_callback) # Try again for value in intermed.IntermediateSymbolTable.file_symbol_url("windows", filter_string): isf_path = value break if isf_path: vollog.debug("Using symbol library: {}".format(filter_string)) clazz = "volatility.framework.symbols.windows.WindowsKernelIntermedSymbols" # Set the discovered options context.config[join(sub_config_path, "class")] = clazz context.config[join(sub_config_path, "isf_url")] = isf_path # Construct the appropriate symbol table config_path = interfaces.configuration.parent_path(sub_config_path) if isinstance(requirement, interfaces.configuration.ConstructableRequirementInterface): requirement.construct(context, config_path) break else: vollog.debug("Required symbol library path not found: {}".format(filter_string)) else: vollog.debug("No suitable kernel pdb signature found")
[docs] def download_pdb_isf(self, guid: str, age: int, pdb_name: str, progress_callback: constants.ProgressCallback = None) -> None: """Attempts to download the PDB file, convert it to an ISF file and save it to one of the symbol locations.""" # Check for writability filter_string = os.path.join(pdb_name, guid + "-" + str(age)) for path in symbols.__path__: # Store any temporary files created by downloading PDB files tmp_files = [] potential_output_filename = os.path.join(path, "windows", filter_string + ".json.xz") try: os.makedirs(os.path.dirname(potential_output_filename), exist_ok = True) data_written = False with lzma.open(potential_output_filename, "w") as of: # Once we haven't thrown an error, do the computation filename = pdbconv.PdbRetreiver().retreive_pdb(guid + str(age), file_name = pdb_name, progress_callback = progress_callback) if filename: tmp_files.append(filename) location = "file:" + request.pathname2url(tmp_files[-1]) json_output = pdbconv.PdbReader(self.context, location, progress_callback).get_json() of.write(bytes(json.dumps(json_output, indent = 2, sort_keys = True), 'utf-8')) # After we've successfully written it out, record the fact so we don't clear it out data_written = True else: vollog.warning("Symbol file could not be found on remote server" + (" " * 100)) break except PermissionError: continue finally: # If something else failed, removed the symbol file so we don't pick it up in the future if not data_written and os.path.exists(potential_output_filename): os.remove(potential_output_filename) # Clear out all the temporary file if we constructed one for filename in tmp_files: try: os.remove(filename) except PermissionError: vollog.warning("Temporary file could not be removed: {}".format(filename)) else: vollog.warning("Cannot write downloaded symbols, please add the appropriate symbols" " or add/modify a symbols directory that is writable")
[docs] def set_kernel_virtual_offset(self, context: interfaces.context.ContextInterface, valid_kernels: ValidKernelsType) -> None: """Traverses the requirement tree, looking for kernel_virtual_offset values that may need setting and sets it based on the previously identified `valid_kernels`. Args: context: Context on which to operate and provide the kernel virtual offset valid_kernels: List of valid kernels and offsets """ for virtual_layer in valid_kernels: # Set the virtual offset under the TranslationLayer it applies to kvo_path = interfaces.configuration.path_join(context.layers[virtual_layer].config_path, 'kernel_virtual_offset') kvo, kernel = valid_kernels[virtual_layer] context.config[kvo_path] = kvo vollog.debug("Setting kernel_virtual_offset to {}".format(hex(kvo)))
[docs] def get_physical_layer_name(self, context, vlayer): return context.config.get(interfaces.configuration.path_join(vlayer.config_path, 'memory_layer'), None)
[docs] def method_fixed_mapping(self, context: interfaces.context.ContextInterface, vlayer: layers.intel.Intel, progress_callback: constants.ProgressCallback = None) -> ValidKernelsType: # TODO: Verify this is a windows image vollog.debug("Kernel base determination - testing fixed base address") valid_kernels = {} virtual_layer_name = vlayer.name physical_layer_name = self.get_physical_layer_name(context, vlayer) kvo_path = interfaces.configuration.path_join(vlayer.config_path, 'kernel_virtual_offset') kernels = scan(ctx = context, layer_name = physical_layer_name, page_size = vlayer.page_size, progress_callback = progress_callback) for kernel in kernels: # It seems the kernel is loaded at a fixed mapping (presumably because the memory manager hasn't started yet) if kernel['mz_offset'] is None or not isinstance(kernel['mz_offset'], int): # Rule out kernels that couldn't find a suitable MZ header continue if vlayer.bits_per_register == 64: kvo = kernel['mz_offset'] + (31 << int(math.ceil(math.log2(vlayer.maximum_address + 1)) - 5)) else: kvo = kernel['mz_offset'] + (1 << (vlayer.bits_per_register - 1)) try: kvp = vlayer.mapping(kvo, 0) if (any([(p == kernel['mz_offset'] and layer_name == physical_layer_name) for (_, p, _, layer_name) in kvp])): valid_kernels[virtual_layer_name] = (kvo, kernel) # Sit the virtual offset under the TranslationLayer it applies to context.config[kvo_path] = kvo vollog.debug("Setting kernel_virtual_offset to {}".format(hex(kvo))) break else: vollog.debug("Potential kernel_virtual_offset did not map to expected location: {}".format( hex(kvo))) except exceptions.InvalidAddressException: vollog.debug("Potential kernel_virtual_offset caused a page fault: {}".format(hex(kvo))) return valid_kernels
[docs] def method_module_offset(self, context: interfaces.context.ContextInterface, vlayer: layers.intel.Intel, progress_callback: constants.ProgressCallback = None) -> ValidKernelsType: """Method for finding a suitable kernel offset based on a module table.""" vollog.debug("Kernel base determination - searching layer module list structure") valid_kernels = {} # type: ValidKernelsType # If we're here, chances are high we're in a Win10 x64 image with kernel base randomization virtual_layer_name = vlayer.name physical_layer_name = self.get_physical_layer_name(context, vlayer) physical_layer = context.layers[physical_layer_name] # TODO: On older windows, this might be \WINDOWS\system32\nt rather than \SystemRoot\system32\nt results = physical_layer.scan(context, scanners.BytesScanner(b"\\SystemRoot\\system32\\nt"), progress_callback = progress_callback) seen = set() # type: Set[int] # Because this will launch a scan of the virtual layer, we want to be careful for result in results: # TODO: Identify the specific structure we're finding and document this a bit better pointer = context.object("pdbscan!unsigned long long", offset = (result - 16 - int(vlayer.bits_per_register / 8)), layer_name = physical_layer_name) address = pointer & vlayer.address_mask if address in seen: continue seen.add(address) valid_kernels = self.check_kernel_offset(context, vlayer, address, progress_callback) if valid_kernels: break return valid_kernels
[docs] def method_kdbg_offset(self, context: interfaces.context.ContextInterface, vlayer: layers.intel.Intel, progress_callback: constants.ProgressCallback = None) -> ValidKernelsType: vollog.debug("Kernel base determination - using KDBG structure for kernel offset") valid_kernels = {} # type: ValidKernelsType physical_layer_name = self.get_physical_layer_name(context, vlayer) physical_layer = context.layers[physical_layer_name] results = physical_layer.scan(context, scanners.BytesScanner(b"KDBG"), progress_callback = progress_callback) seen = set() # type: Set[int] for result in results: # TODO: Identify the specific structure we're finding and document this a bit better pointer = context.object("pdbscan!unsigned long long", offset = result + 8, layer_name = physical_layer_name) address = pointer & vlayer.address_mask if address in seen: continue seen.add(address) valid_kernels = self.check_kernel_offset(context, vlayer, address, progress_callback) if valid_kernels: break return valid_kernels
[docs] def check_kernel_offset(self, context: interfaces.context.ContextInterface, vlayer: layers.intel.Intel, address: int, progress_callback: constants.ProgressCallback = None) -> ValidKernelsType: """Scans a virtual address.""" # Scan a few megs of the virtual space at the location to see if they're potential kernels valid_kernels = {} # type: ValidKernelsType virtual_layer_name = vlayer.name try: if vlayer.read(address, 0x2) == b'MZ': res = list( scan(ctx = context, layer_name = vlayer.name, page_size = vlayer.page_size, progress_callback = progress_callback, start = address, end = address + self.max_pdb_size)) if res: valid_kernels[virtual_layer_name] = (address, res[0]) except exceptions.InvalidAddressException: pass return valid_kernels
# List of methods to be run, in order, to determine the valid kernels methods = [method_kdbg_offset, method_module_offset, method_fixed_mapping]
[docs] def determine_valid_kernels(self, context: interfaces.context.ContextInterface, potential_layers: List[str], progress_callback: constants.ProgressCallback = None) -> ValidKernelsType: """Runs through the identified potential kernels and verifies their suitability. This carries out a scan using the pdb_signature scanner on a physical layer. It uses the results of the scan to determine the virtual offset of the kernel. On early windows implementations there is a fixed mapping between the physical and virtual addresses of the kernel. On more recent versions a search is conducted for a structure that will identify the kernel's virtual offset. Args: context: Context on which to operate potential_kernels: Dictionary containing `GUID`, `age`, `pdb_name` and `mz_offset` keys progress_callback: Function taking a percentage and optional description to be called during expensive computations to indicate progress Returns: A dictionary of valid kernels """ valid_kernels = {} # type: ValidKernelsType for virtual_layer_name in potential_layers: vlayer = context.layers.get(virtual_layer_name, None) if isinstance(vlayer, layers.intel.Intel): for method in self.methods: valid_kernels = method(self, context, vlayer, progress_callback) if valid_kernels: break if not valid_kernels: vollog.info("No suitable kernels found during pdbscan") return valid_kernels
def __call__(self, context: interfaces.context.ContextInterface, config_path: str, requirement: interfaces.configuration.RequirementInterface, progress_callback: constants.ProgressCallback = None) -> None: if requirement.unsatisfied(context, config_path): if "pdbscan" not in context.symbol_space: context.symbol_space.append(native.NativeTable("pdbscan", native.std_ctypes)) # TODO: check if this is a windows symbol requirement, otherwise ignore it self._symbol_requirements = self.find_requirements(context, config_path, requirement, requirements.SymbolTableRequirement) potential_layers = self.find_virtual_layers_from_req(context = context, config_path = config_path, requirement = requirement) for sub_config_path, symbol_req in self._symbol_requirements: parent_path = interfaces.configuration.parent_path(sub_config_path) if symbol_req.unsatisfied(context, parent_path): valid_kernels = self.determine_valid_kernels(context, potential_layers, progress_callback) if valid_kernels: self.recurse_symbol_fulfiller(context, valid_kernels, progress_callback) self.set_kernel_virtual_offset(context, valid_kernels)