# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import binascii
import code
import random
import string
import struct
import sys
from typing import Any, Dict, List, Optional, Tuple, Union, Type
from volatility.cli import text_renderer
from volatility.framework import renderers, interfaces, objects, plugins, exceptions
from volatility.framework.configuration import requirements
from volatility.framework.layers import intel
try:
import capstone
has_capstone = True
except ImportError:
has_capstone = False
[docs]class Volshell(interfaces.plugins.PluginInterface):
"""Shell environment to directly interact with a memory image."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__current_layer = None # type: Optional[str]
[docs] @classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.TranslationLayerRequirement(name = 'primary',
description = 'Memory layer for the kernel',
architectures = ["Intel32", "Intel64"])
]
[docs] def run(self, additional_locals: Dict[str, Any] = None) -> interfaces.renderers.TreeGrid:
"""Runs the interactive volshell plugin.
Returns:
Return a TreeGrid but this is always empty since the point of this plugin is to run interactively
"""
self._current_layer = self.config['primary']
# Try to enable tab completion
try:
import readline
except ImportError:
pass
else:
import rlcompleter
completer = rlcompleter.Completer(namespace = self._construct_locals_dict())
readline.set_completer(completer.complete)
readline.parse_and_bind("tab: complete")
print("Readline imported successfully")
# TODO: provide help, consider generic functions (pslist?) and/or providing windows/linux functions
mode = self.__module__.split('.')[-1]
mode = mode[0].upper() + mode[1:]
banner = """
Call help() to see available functions
Volshell mode: {}
Current Layer: {}
""".format(mode, self.current_layer)
sys.ps1 = "({}) >>> ".format(self.current_layer)
code.interact(banner = banner, local = self._construct_locals_dict())
return renderers.TreeGrid([("Terminating", str)], None)
[docs] def help(self, *args):
"""Describes the available commands"""
if args:
help(*args)
return
variables = []
print("\nMethods:")
for aliases, item in self.construct_locals():
name = ", ".join(aliases)
if item.__doc__ and callable(item):
print("* {}".format(name))
print(" {}".format(item.__doc__))
else:
variables.append(name)
print("\nVariables:")
for var in variables:
print(" {}".format(var))
[docs] def construct_locals(self) -> List[Tuple[List[str], Any]]:
"""Returns a dictionary listing the functions to be added to the
environment."""
return [(['dt', 'display_type'], self.display_type), (['db', 'display_bytes'], self.display_bytes),
(['dw', 'display_words'], self.display_words), (['dd',
'display_doublewords'], self.display_doublewords),
(['dq', 'display_quadwords'], self.display_quadwords), (['dis', 'disassemble'], self.disassemble),
(['cl', 'change_layer'], self.change_layer), (['context'], self.context), (['self'], self),
(['dpo', 'display_plugin_output'], self.display_plugin_output),
(['gt', 'generate_treegrid'], self.generate_treegrid), (['rt',
'render_treegrid'], self.render_treegrid),
(['ds', 'display_symbols'], self.display_symbols), (['hh', 'help'], self.help)]
def _construct_locals_dict(self) -> Dict[str, Any]:
"""Returns a dictionary of the locals """
result = {}
for aliases, value in self.construct_locals():
for alias in aliases:
result[alias] = value
return result
def _read_data(self, offset, count = 128, layer_name = None):
"""Reads the bytes necessary for the display_* methods"""
return self.context.layers[layer_name or self.current_layer].read(offset, count)
def _display_data(self, offset: int, remaining_data: bytes, format_string: str = "B", ascii: bool = True):
"""Display a series of bytes"""
chunk_size = struct.calcsize(format_string)
data_length = len(remaining_data)
remaining_data = remaining_data[:data_length - (data_length % chunk_size)]
while remaining_data:
current_line, remaining_data = remaining_data[:16], remaining_data[16:]
offset += 16
data_blocks = [current_line[chunk_size * i:chunk_size * (i + 1)] for i in range(16 // chunk_size)]
data_blocks = [x for x in data_blocks if x != b'']
valid_data = [("{:0" + str(2 * chunk_size) + "x}").format(struct.unpack(format_string, x)[0])
for x in data_blocks]
padding_data = [" " * 2 * chunk_size for _ in range((16 - len(current_line)) // chunk_size)]
hex_data = " ".join(valid_data + padding_data)
ascii_data = ""
if ascii:
connector = " "
if chunk_size < 2:
connector = ""
ascii_data = connector.join([self._ascii_bytes(x) for x in valid_data])
print(hex(offset), " ", hex_data, " ", ascii_data)
@staticmethod
def _ascii_bytes(bytes):
"""Converts bytes into an ascii string"""
return "".join([chr(x) if 32 < x < 127 else '.' for x in binascii.unhexlify(bytes)])
@property
def current_layer(self):
return self._current_layer
[docs] def change_layer(self, layer_name = None):
"""Changes the current default layer"""
if not layer_name:
layer_name = self.config['primary']
self._current_layer = layer_name
sys.ps1 = "({}) >>> ".format(self.current_layer)
[docs] def display_bytes(self, offset, count = 128, layer_name = None):
"""Displays byte values and ASCII characters"""
remaining_data = self._read_data(offset, count = count, layer_name = layer_name)
self._display_data(offset, remaining_data)
[docs] def display_quadwords(self, offset, count = 128, layer_name = None):
"""Displays quad-word values (8 bytes) and corresponding ASCII characters"""
remaining_data = self._read_data(offset, count = count, layer_name = layer_name)
self._display_data(offset, remaining_data, format_string = "Q")
[docs] def display_doublewords(self, offset, count = 128, layer_name = None):
"""Displays double-word values (4 bytes) and corresponding ASCII characters"""
remaining_data = self._read_data(offset, count = count, layer_name = layer_name)
self._display_data(offset, remaining_data, format_string = "I")
[docs] def display_words(self, offset, count = 128, layer_name = None):
"""Displays word values (2 bytes) and corresponding ASCII characters"""
remaining_data = self._read_data(offset, count = count, layer_name = layer_name)
self._display_data(offset, remaining_data, format_string = "H")
[docs] def disassemble(self, offset, count = 128, layer_name = None, architecture = None):
"""Disassembles a number of instructions from the code at offset"""
remaining_data = self._read_data(offset, count = count, layer_name = layer_name)
if not has_capstone:
print("Capstone not available - please install it to use the disassemble command")
else:
if isinstance(self.context.layers[layer_name or self.current_layer], intel.Intel32e):
architecture = 'intel64'
elif isinstance(self.context.layers[layer_name or self.current_layer], intel.Intel):
architecture = 'intel'
disasm_types = {
'intel': capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32),
'intel64': capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64),
'arm': capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
'arm64': capstone.Cs(capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM)
}
if architecture is not None:
for i in disasm_types[architecture].disasm(remaining_data, offset):
print("0x%x:\t%s\t%s" % (i.address, i.mnemonic, i.op_str))
[docs] def display_type(self,
object: Union[str, interfaces.objects.ObjectInterface, interfaces.objects.Template],
offset: int = None):
"""Display Type describes the members of a particular object in alphabetical order"""
if not isinstance(object, (str, interfaces.objects.ObjectInterface, interfaces.objects.Template)):
print("Cannot display information about non-type object")
return
if isinstance(object, str) and offset is None:
object = self.context.symbol_space.get_type(object)
elif isinstance(object, str) and offset is not None:
object = self.context.object(object, layer_name = self.current_layer, offset = offset)
elif offset is not None:
object = self.context.object(object.vol.type_name, layer_name = self.current_layer, offset = offset)
if hasattr(object.vol, 'size'):
print("{} ({} bytes)".format(object.vol.type_name, object.vol.size))
elif hasattr(object.vol, 'data_format'):
data_format = object.vol.data_format
print("{} ({} bytes, {} endian, {})".format(object.vol.type_name, data_format.length, data_format.byteorder,
'signed' if data_format.signed else 'unsigned'))
if hasattr(object.vol, 'members'):
longest_member = longest_offset = longest_typename = 0
for member in object.vol.members:
relative_offset, member_type = object.vol.members[member]
longest_member = max(len(member), longest_member)
longest_offset = max(len(hex(relative_offset)), longest_offset)
longest_typename = max(len(member_type.vol.type_name), longest_typename)
for member in sorted(object.vol.members, key = lambda x: (object.vol.members[x][0], x)):
relative_offset, member_type = object.vol.members[member]
len_offset = len(hex(relative_offset))
len_member = len(member)
len_typename = len(member_type.vol.type_name)
if isinstance(object, interfaces.objects.ObjectInterface):
# We're an instance, so also display the data
print(" " * (longest_offset - len_offset), hex(relative_offset), ": ", member,
" " * (longest_member - len_member), " ", member_type.vol.type_name,
" " * (longest_typename - len_typename), " ", self._display_value(getattr(object, member)))
else:
print(" " * (longest_offset - len_offset), hex(relative_offset), ": ", member,
" " * (longest_member - len_member), " ", member_type.vol.type_name)
@classmethod
def _display_value(self, value: Any) -> str:
if isinstance(value, objects.PrimitiveObject):
return repr(value)
elif isinstance(value, objects.Array):
return repr([self._display_value(val) for val in value])
else:
return hex(value.vol.offset)
[docs] def consume_file(self, file: interfaces.plugins.FileInterface) -> None:
"""Dummy file consumer to satisfy the """
pass
[docs] def generate_treegrid(self, plugin: Type[interfaces.plugins.PluginInterface],
**kwargs) -> interfaces.renderers.TreeGrid:
"""Generates a TreeGrid based on a specific plugin passing in kwarg configuration values"""
path_join = interfaces.configuration.path_join
# Generate a temporary configuration path
plugin_config_suffix = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
plugin_path = path_join(self.config_path, plugin_config_suffix)
# Populate the configuration
for name, value in kwargs.items():
self.config[path_join(plugin_config_suffix, plugin.__name__, name)] = value
try:
constructed = plugins.construct_plugin(self.context, [], plugin, plugin_path, None, NullFileConsumer())
return constructed.run()
except exceptions.UnsatisfiedException as excp:
print("Unable to validate the plugin requirements: {}\n".format([x for x in excp.unsatisfied]))
[docs] def render_treegrid(self,
treegrid: interfaces.renderers.TreeGrid,
renderer: Optional[interfaces.renderers.Renderer] = None) -> None:
"""Renders a treegrid as produced by generate_treegrid"""
if renderer is None:
renderer = text_renderer.QuickTextRenderer()
renderer.render(treegrid)
[docs] def display_plugin_output(self, plugin: Type[interfaces.plugins.PluginInterface], **kwargs) -> None:
"""Displays the output for a particular plugin (with keyword arguments)"""
self.render_treegrid(self.generate_treegrid(plugin, **kwargs))
[docs] def display_symbols(self, symbol_table: str = None):
"""Prints an alphabetical list of symbols for a symbol table"""
if symbol_table is None:
print("No symbol table provided")
return
longest_offset = longest_name = 0
table = self.context.symbol_space[symbol_table]
for symbol_name in table.symbols:
symbol = table.get_symbol(symbol_name)
longest_offset = max(longest_offset, len(hex(symbol.address)))
longest_name = max(longest_name, len(symbol.name))
for symbol_name in sorted(table.symbols):
symbol = table.get_symbol(symbol_name)
len_offset = len(hex(symbol.address))
print(" " * (longest_offset - len_offset), hex(symbol.address), " ", symbol.name)
[docs]class NullFileConsumer(interfaces.plugins.FileConsumerInterface):
"""Null FileConsumer that swallows files whole"""
[docs] def consume_file(self, file: interfaces.plugins.FileInterface) -> None:
"""Dummy file consumer to satisfy the FileConsumerInterface"""
pass