Source code for volatility.framework.automagic.linux

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import logging
from typing import List, Optional, Tuple, Type

from volatility.framework import interfaces, constants, exceptions, layers
from volatility.framework import symbols, objects
from volatility.framework.automagic import symbol_cache, symbol_finder
from volatility.framework.layers import intel, scanners
from volatility.framework.symbols import linux

vollog = logging.getLogger(__name__)


[docs]class LinuxBannerCache(symbol_cache.SymbolBannerCache): """Caches the banners found in the Linux symbol files.""" os = "linux" symbol_name = "linux_banner" banner_path = constants.LINUX_BANNERS_PATH
[docs]class LinuxSymbolFinder(symbol_finder.SymbolFinder): """Linux symbol loader based on uname signature strings.""" banner_config_key = "kernel_banner" banner_cache = LinuxBannerCache symbol_class = "volatility.framework.symbols.linux.LinuxKernelIntermedSymbols"
[docs]class LintelStacker(interfaces.automagic.StackerLayerInterface): stack_order = 12
[docs] @classmethod def stack(cls, context: interfaces.context.ContextInterface, layer_name: str, progress_callback: constants.ProgressCallback = None) -> Optional[interfaces.layers.DataLayerInterface]: """Attempts to identify linux within this layer.""" # Bail out by default unless we can stack properly layer = context.layers[layer_name] join = interfaces.configuration.path_join # Never stack on top of an intel layer # FIXME: Find a way to improve this check if isinstance(layer, intel.Intel): return None linux_banners = LinuxBannerCache.load_banners() # If we have no banners, don't bother scanning if not linux_banners: vollog.info("No Linux banners found - if this is a linux plugin, please check your symbol files location") return None mss = scanners.MultiStringScanner([x for x in linux_banners if x is not None]) for _, banner in layer.scan(context = context, scanner = mss, progress_callback = progress_callback): dtb = None vollog.debug("Identified banner: {}".format(repr(banner))) symbol_files = linux_banners.get(banner, None) if symbol_files: isf_path = symbol_files[0] table_name = context.symbol_space.free_table_name('LintelStacker') table = linux.LinuxKernelIntermedSymbols(context, 'temporary.' + table_name, name = table_name, isf_url = isf_path) context.symbol_space.append(table) kaslr_shift, _ = LinuxUtilities.find_aslr(context, table_name, layer_name, progress_callback = progress_callback) layer_class = intel.Intel # type: Type if 'init_level4_pgt' in table.symbols: layer_class = intel.Intel32e dtb_symbol_name = 'init_level4_pgt' else: dtb_symbol_name = 'swapper_pg_dir' dtb = LinuxUtilities.virtual_to_physical_address( table.get_symbol(dtb_symbol_name).address + kaslr_shift) # Build the new layer new_layer_name = context.layers.free_layer_name("IntelLayer") config_path = join("IntelHelper", new_layer_name) context.config[join(config_path, "memory_layer")] = layer_name context.config[join(config_path, "page_map_offset")] = dtb context.config[join(config_path, LinuxSymbolFinder.banner_config_key)] = str(banner, 'latin-1') layer = layer_class(context, config_path = config_path, name = new_layer_name) if layer and dtb: vollog.debug("DTB was found at: 0x{:0x}".format(dtb)) return layer return None
[docs]class LinuxUtilities(object): """Class with multiple useful linux functions.""" # based on __d_path from the Linux kernel @classmethod def _do_get_path(cls, rdentry, rmnt, dentry, vfsmnt) -> str: ret_path = [] # type: List[str] while dentry != rdentry or vfsmnt != rmnt: dname = dentry.path() if dname == "": break ret_path.insert(0, dname.strip('/')) if dentry == vfsmnt.get_mnt_root() or dentry == dentry.d_parent: if vfsmnt.get_mnt_parent() == vfsmnt: break dentry = vfsmnt.get_mnt_mountpoint() vfsmnt = vfsmnt.get_mnt_parent() continue parent = dentry.d_parent dentry = parent # if we did not gather any valid dentrys in the path, then the entire file is # either 1) smeared out of memory or 2) de-allocated and corresponding structures overwritten # we return an empty string in this case to avoid confusion with something like a handle to the root # directory (e.g., "/") if not ret_path: return "" ret_val = '/'.join([str(p) for p in ret_path if p != ""]) if ret_val.startswith(("socket:", "pipe:")): if ret_val.find("]") == -1: try: inode = dentry.d_inode ino = inode.i_ino except exceptions.InvalidAddressException: ino = 0 ret_val = ret_val[:-1] + ":[{0}]".format(ino) else: ret_val = ret_val.replace("/", "") elif ret_val != "inotify": ret_val = '/' + ret_val return ret_val # method used by 'older' kernels # TODO: lookup when dentry_operations->d_name was merged into the mainline kernel for exact version @classmethod def _get_path_file(cls, task, filp) -> str: rdentry = task.fs.get_root_dentry() rmnt = task.fs.get_root_mnt() dentry = filp.get_dentry() vfsmnt = filp.get_vfsmnt() return LinuxUtilities._do_get_path(rdentry, rmnt, dentry, vfsmnt) @classmethod def _get_new_sock_pipe_path(cls, context, task, filp) -> str: dentry = filp.get_dentry() sym_addr = dentry.d_op.d_dname symbs = list(context.symbol_space.get_symbols_by_location(sym_addr)) if len(symbs) == 1: sym = symbs[0].split(constants.BANG)[1] if sym == "sockfs_dname": pre_name = "socket" elif sym == "anon_inodefs_dname": pre_name = "anon_inode" elif sym == "pipefs_dname": pre_name = "pipe" elif sym == "simple_dname": pre_name = cls._get_path_file(task, filp) else: pre_name = "<unsupported d_op symbol: {0}>".format(sym) ret = "{0}:[{1:d}]".format(pre_name, dentry.d_inode.i_ino) else: ret = "<invalid d_dname pointer> {0:x}".format(sym_addr) return ret # a 'file' structure doesn't have enough information to properly restore its full path # we need the root mount information from task_struct to determine this
[docs] @classmethod def path_for_file(cls, context, task, filp) -> str: try: dentry = filp.get_dentry() except exceptions.InvalidAddressException: return "" if dentry == 0: return "" dname_is_valid = False # TODO COMPARE THIS IN LSOF OUTPUT TO VOL2 try: if dentry.d_op and dentry.d_op.has_member("d_dname") and dentry.d_op.d_dname: dname_is_valid = True except exceptions.InvalidAddressException: dname_is_valid = False if dname_is_valid: ret = LinuxUtilities._get_new_sock_pipe_path(context, task, filp) else: ret = LinuxUtilities._get_path_file(task, filp) return ret
[docs] @classmethod def files_descriptors_for_process(cls, config: interfaces.configuration.HierarchicalDict, context: interfaces.context.ContextInterface, task: interfaces.objects.ObjectInterface): fd_table = task.files.get_fds() if fd_table == 0: return max_fds = task.files.get_max_fds() # corruption check if max_fds > 500000: return file_type = config["vmlinux"] + constants.BANG + 'file' fds = objects.utility.array_of_pointers(fd_table, count = max_fds, subtype = file_type, context = context) for (fd_num, filp) in enumerate(fds): if filp != 0: full_path = LinuxUtilities.path_for_file(context, task, filp) yield fd_num, filp, full_path
[docs] @classmethod def aslr_mask_symbol_table(cls, context: interfaces.context.ContextInterface, symbol_table: str, layer_name: str, aslr_shift = 0): sym_table = context.symbol_space[symbol_table] sym_layer = context.layers[layer_name] if aslr_shift == 0: if not isinstance(sym_layer, layers.intel.Intel): raise TypeError("Layer name {} is not an intel space") aslr_layer = sym_layer.config['memory_layer'] _, aslr_shift = cls.find_aslr(context, symbol_table, aslr_layer) symbols.mask_symbol_table(sym_table, sym_layer.address_mask, aslr_shift)
[docs] @classmethod def find_aslr(cls, context: interfaces.context.ContextInterface, symbol_table: str, layer_name: str, progress_callback: constants.ProgressCallback = None) \ -> Tuple[int, int]: """Determines the offset of the actual DTB in physical space and its symbol offset.""" init_task_symbol = symbol_table + constants.BANG + 'init_task' init_task_json_address = context.symbol_space.get_symbol(init_task_symbol).address swapper_signature = rb"swapper(\/0|\x00\x00)\x00\x00\x00\x00\x00\x00" module = context.module(symbol_table, layer_name, 0) for offset in context.layers[layer_name].scan(scanner = scanners.RegExScanner(swapper_signature), context = context, progress_callback = progress_callback): task_symbol = module.get_type('task_struct') init_task_address = offset - task_symbol.relative_child_offset('comm') init_task = module.object(object_type = 'task_struct', offset = init_task_address, absolute = True) if init_task.pid != 0: continue elif init_task.has_member('state') and init_task.state.cast('unsigned int') != 0: continue # This we get for free aslr_shift = init_task.files.cast('long unsigned int') - module.get_symbol('init_files').address kaslr_shift = init_task_address - cls.virtual_to_physical_address(init_task_json_address) if aslr_shift & 0xfff != 0 or kaslr_shift & 0xfff != 0: continue vollog.debug("Linux ASLR shift values determined: physical {:0x} virtual {:0x}".format( kaslr_shift, aslr_shift)) return kaslr_shift, aslr_shift # We don't throw an exception, because we may legitimately not have an ASLR shift, but we report it vollog.debug("Scanners could not determine any ASLR shifts, using 0 for both") return 0, 0
[docs] @classmethod def virtual_to_physical_address(cls, addr: int) -> int: """Converts a virtual linux address to a physical one (does not account of ASLR)""" if addr > 0xffffffff80000000: return addr - 0xffffffff80000000 return addr - 0xc0000000