# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import enum
import logging
from typing import Dict, Generator, List, Optional, Tuple, Callable
from volatility.framework import constants, interfaces, renderers, exceptions, symbols
from volatility.framework.configuration import requirements
from volatility.framework.interfaces import plugins, configuration
from volatility.framework.layers import scanners
from volatility.framework.renderers import format_hints
from volatility.framework.symbols import intermed
from volatility.framework.symbols.windows import extensions
from volatility.plugins.windows import handles
vollog = logging.getLogger(__name__)
# TODO: When python3.5 is no longer supported, make this enum.IntFlag
[docs]class PoolType(enum.IntEnum):
"""Class to maintain the different possible PoolTypes The values must be
integer powers of 2."""
PAGED = 1
NONPAGED = 2
FREE = 4
[docs]class PoolConstraint:
"""Class to maintain tag/size/index/type information about Pool header
tags."""
def __init__(self,
tag: bytes,
type_name: str,
object_type: Optional[str] = None,
page_type: Optional[PoolType] = None,
size: Optional[Tuple[Optional[int], Optional[int]]] = None,
index: Optional[Tuple[Optional[int], Optional[int]]] = None,
alignment: Optional[int] = 1) -> None:
self.tag = tag
self.type_name = type_name
self.object_type = object_type
self.page_type = page_type
self.size = size
self.index = index
self.alignment = alignment
[docs]def os_distinguisher(version_check: Callable[[Tuple[int, ...]], bool],
fallback_checks: List[Tuple[str, Optional[str], bool]]
) -> Callable[[interfaces.context.ContextInterface, str], bool]:
"""Distinguishes a symbol table as being above a particular version or
point.
This will primarily check the version metadata first and foremost.
If that metadata isn't available then each item in the fallback_checks is tested.
If invert is specified then the result will be true if the version is less than that specified, or in the case of
fallback, if any of the fallback checks is successful.
A fallback check is made up of:
* a symbol or type name
* a member name (implying that the value before was a type name)
* whether that symbol, type or member must be present or absent for the symbol table to be more above the required point
Note:
Specifying that a member must not be present includes the whole type not being present too (ie, either will pass the test)
Args:
version_check: Function that takes a 4-tuple version and returns whether whether the provided version is above a particular point
fallback_checks: A list of symbol/types/members of types, and whether they must be present to be above the required point
Returns:
A function that takes a context and a symbol table name and determines whether that symbol table passes the distinguishing checks
"""
# try the primary method based on the pe version in the ISF
def method(context: interfaces.context.ContextInterface, symbol_table: str) -> bool:
"""
Args:
context: The context that contains the symbol table named `symbol_table`
symbol_table: Name of the symbol table within the context to distinguish the version of
Returns:
True if the symbol table is of the required version
"""
try:
pe_version = context.symbol_space[symbol_table].metadata.pe_version
major, minor, revision, build = pe_version
return version_check((major, minor, revision, build))
except (AttributeError, ValueError, TypeError):
vollog.log(constants.LOGLEVEL_VVV, "Windows PE version data is not available")
if not fallback_checks:
raise ValueError("No fallback methods for os_distinguishing provided")
# fall back to the backup method, if necessary
for name, member, response in fallback_checks:
if member is None:
if (context.symbol_space.has_symbol(symbol_table + constants.BANG + name)
or context.symbol_space.has_type(symbol_table + constants.BANG + name)) != response:
return False
else:
try:
symbol_type = context.symbol_space.get_type(symbol_table + constants.BANG + name)
if symbol_type.has_member(member) != response:
return False
except exceptions.SymbolError:
if not response:
return False
return True
return method
[docs]class PoolScanner(plugins.PluginInterface):
"""A generic pool scanner plugin."""
_version = (1, 0, 0)
[docs] @classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(name = 'primary',
description = 'Memory layer for the kernel',
architectures = ["Intel32", "Intel64"]),
requirements.SymbolTableRequirement(name = "nt_symbols", description = "Windows kernel symbols"),
requirements.PluginRequirement(name = 'handles', plugin = handles.Handles, version = (1, 0, 0)),
]
is_windows_10 = os_distinguisher(version_check = lambda x: x >= (10, 0),
fallback_checks = [("ObHeaderCookie", None, True)])
is_windows_8_or_later = os_distinguisher(version_check = lambda x: x >= (6, 2),
fallback_checks = [("_HANDLE_TABLE", "HandleCount", False)])
# Technically, this is win7 or less
is_windows_7 = os_distinguisher(version_check = lambda x: x == (6, 1),
fallback_checks = [("_OBJECT_HEADER", "TypeIndex", True),
("_HANDLE_TABLE", "HandleCount", True)])
def _generator(self):
symbol_table = self.config["nt_symbols"]
constraints = self.builtin_constraints(symbol_table)
for constraint, mem_object, header in self.generate_pool_scan(self.context, self.config["primary"],
symbol_table, constraints):
# generate some type-specific info for sanity checking
if constraint.object_type == "Process":
name = mem_object.ImageFileName.cast("string",
max_length = mem_object.ImageFileName.vol.count,
errors = "replace")
elif constraint.object_type == "File":
try:
name = mem_object.FileName.String
except exceptions.InvalidAddressException:
vollog.log(constants.LOGLEVEL_VVV, "Skipping file at {0:#x}".format(mem_object.vol.offset))
continue
else:
name = renderers.NotApplicableValue()
yield (0, (constraint.type_name, format_hints.Hex(header.vol.offset), header.vol.layer_name, name))
[docs] @staticmethod
def builtin_constraints(symbol_table: str, tags_filter: List[bytes] = None) -> List[PoolConstraint]:
"""Get built-in PoolConstraints given a list of pool tags.
The tags_filter is a list of pool tags, and the associated
PoolConstraints are returned. If tags_filter is empty or
not supplied, then all builtin constraints are returned.
Args:
symbol_table: The name of the symbol table to prepend to the types used
tags_filter: List of tags to return or None to return all
Returns:
A list of well-known constructed PoolConstraints that match the provided tags
"""
builtins = [
# atom tables
PoolConstraint(b'AtmT',
type_name = symbol_table + constants.BANG + "_RTL_ATOM_TABLE",
size = (200, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# processes on windows before windows 8
PoolConstraint(b'Pro\xe3',
type_name = symbol_table + constants.BANG + "_EPROCESS",
object_type = "Process",
size = (600, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# processes on windows starting with windows 8
PoolConstraint(b'Proc',
type_name = symbol_table + constants.BANG + "_EPROCESS",
object_type = "Process",
size = (600, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# files on windows before windows 8
PoolConstraint(b'Fil\xe5',
type_name = symbol_table + constants.BANG + "_FILE_OBJECT",
object_type = "File",
size = (150, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# files on windows starting with windows 8
PoolConstraint(b'File',
type_name = symbol_table + constants.BANG + "_FILE_OBJECT",
object_type = "File",
size = (150, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# mutants on windows before windows 8
PoolConstraint(b'Mut\xe1',
type_name = symbol_table + constants.BANG + "_KMUTANT",
object_type = "Mutant",
size = (64, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# mutants on windows starting with windows 8
PoolConstraint(b'Muta',
type_name = symbol_table + constants.BANG + "_KMUTANT",
object_type = "Mutant",
size = (64, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# drivers on windows before windows 8
PoolConstraint(b'Dri\xf6',
type_name = symbol_table + constants.BANG + "_DRIVER_OBJECT",
object_type = "Driver",
size = (248, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# drivers on windows starting with windows 8
PoolConstraint(b'Driv',
type_name = symbol_table + constants.BANG + "_DRIVER_OBJECT",
object_type = "Driver",
size = (248, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# kernel modules
PoolConstraint(b'MmLd',
type_name = symbol_table + constants.BANG + "_LDR_DATA_TABLE_ENTRY",
size = (76, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# symlinks on windows before windows 8
PoolConstraint(b'Sym\xe2',
type_name = symbol_table + constants.BANG + "_OBJECT_SYMBOLIC_LINK",
object_type = "SymbolicLink",
size = (72, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# symlinks on windows starting with windows 8
PoolConstraint(b'Symb',
type_name = symbol_table + constants.BANG + "_OBJECT_SYMBOLIC_LINK",
object_type = "SymbolicLink",
size = (72, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
# registry hives
PoolConstraint(b'CM10',
type_name = symbol_table + constants.BANG + "_CMHIVE",
size = (800, None),
page_type = PoolType.PAGED | PoolType.NONPAGED | PoolType.FREE),
]
if not tags_filter:
return builtins
return [constraint for constraint in builtins if constraint.tag in tags_filter]
[docs] @classmethod
def generate_pool_scan(cls,
context: interfaces.context.ContextInterface,
layer_name: str,
symbol_table: str,
constraints: List[PoolConstraint]) \
-> Generator[Tuple[
PoolConstraint, interfaces.objects.ObjectInterface, interfaces.objects.ObjectInterface], None, None]:
"""
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
symbol_table: The name of the table containing the kernel symbols
constraints: List of pool constraints used to limit the scan results
Returns:
Iterable of tuples, containing the constraint that matched, the object from memory, the object header used to determine the object
"""
# get the object type map
type_map = handles.Handles.get_type_map(context = context, layer_name = layer_name, symbol_table = symbol_table)
cookie = handles.Handles.find_cookie(context = context, layer_name = layer_name, symbol_table = symbol_table)
is_windows_10 = cls.is_windows_10(context, symbol_table)
is_windows_8_or_later = cls.is_windows_8_or_later(context, symbol_table)
# start off with the primary virtual layer
scan_layer = layer_name
# switch to a non-virtual layer if necessary
if not is_windows_10:
scan_layer = context.layers[scan_layer].config['memory_layer']
for constraint, header in cls.pool_scan(context, scan_layer, symbol_table, constraints, alignment = 8):
mem_object = header.get_object(type_name = constraint.type_name,
type_map = type_map,
use_top_down = is_windows_8_or_later,
object_type = constraint.object_type,
native_layer_name = 'primary',
cookie = cookie)
if mem_object is None:
vollog.log(constants.LOGLEVEL_VVV, "Cannot create an instance of {}".format(constraint.type_name))
continue
yield constraint, mem_object, header
[docs] @classmethod
def pool_scan(cls,
context: interfaces.context.ContextInterface,
layer_name: str,
symbol_table: str,
pool_constraints: List[PoolConstraint],
alignment: int = 8,
progress_callback: Optional[constants.ProgressCallback] = None) \
-> Generator[Tuple[PoolConstraint, interfaces.objects.ObjectInterface], None, None]:
"""Returns the _POOL_HEADER object (based on the symbol_table template)
after scanning through layer_name returning all headers that match any
of the constraints provided. Only one constraint can be provided per
tag.
Args:
context: The context to retrieve required elements (layers, symbol tables) from
layer_name: The name of the layer on which to operate
symbol_table: The name of the table containing the kernel symbols
pool_constraints: List of pool constraints used to limit the scan results
alignment: An optional value that all pool headers will be aligned to
progress_callback: An optional function to provide progress feedback whilst scanning
Returns:
An Iterable of pool constraints and the pool headers associated with them
"""
# Setup the pattern
constraint_lookup = {} # type: Dict[bytes, PoolConstraint]
for constraint in pool_constraints:
if constraint.tag in constraint_lookup:
raise ValueError("Constraint tag is used for more than one constraint: {}".format(constraint.tag))
constraint_lookup[constraint.tag] = constraint
module = cls._get_pool_header_module(context, layer_name, symbol_table)
# Run the scan locating the offsets of a particular tag
layer = context.layers[layer_name]
scanner = PoolHeaderScanner(module, constraint_lookup, alignment)
yield from layer.scan(context, scanner, progress_callback)
@classmethod
def _get_pool_header_module(cls, context, layer_name, symbol_table):
# Setup the pool header and offset differential
try:
module = context.module(symbol_table, layer_name, offset = 0)
module.get_type("_POOL_HEADER")
except exceptions.SymbolError:
# We have to manually load a symbol table
if symbols.symbol_table_is_64bit(context, symbol_table):
is_win_7 = cls.is_windows_7(context, symbol_table)
if is_win_7:
pool_header_json_filename = "poolheader-x64-win7"
else:
pool_header_json_filename = "poolheader-x64"
else:
pool_header_json_filename = "poolheader-x86"
new_table_name = intermed.IntermediateSymbolTable.create(
context = context,
config_path = configuration.path_join(context.symbol_space[symbol_table].config_path, "poolheader"),
sub_path = "windows",
filename = pool_header_json_filename,
table_mapping = {'nt_symbols': symbol_table},
class_types = {'_POOL_HEADER': extensions.POOL_HEADER})
module = context.module(new_table_name, layer_name, offset = 0)
return module
[docs] def run(self) -> renderers.TreeGrid:
return renderers.TreeGrid([("Tag", str), ("Offset", format_hints.Hex), ("Layer", str), ("Name", str)],
self._generator())