Source code for volatility.framework.layers.resources

# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#

import bz2
import contextlib
import gzip
import hashlib
import logging
import lzma
import os
import ssl
import urllib.parse
import urllib.request
import zipfile
from typing import Optional

from volatility import framework
from volatility.framework import constants

try:
    import magic

    HAS_MAGIC = True
except ImportError:
    HAS_MAGIC = False

try:
    import smb.SMBHandler
except ImportError:
    pass

vollog = logging.getLogger(__name__)

# TODO: Type-annotating the ResourceAccessor.open method is difficult because HTTPResponse is not actually an IO[Any] type
#   fix this


[docs]class ResourceAccessor(object): """Object for openning URLs as files (downloading locally first if necessary)""" def __init__(self, progress_callback: Optional[constants.ProgressCallback] = None, context: Optional[ssl.SSLContext] = None) -> None: """Creates a resource accessor. Note: context is an SSL context, not a volatility context """ self._progress_callback = progress_callback self._context = context self._handlers = list(framework.class_subclasses(urllib.request.BaseHandler)) vollog.log(constants.LOGLEVEL_VVV, "Available URL handlers: {}".format(", ".join([x.__name__ for x in self._handlers])))
[docs] def open(self, url, mode = "rb"): """Returns a file-like object for a particular URL opened in mode.""" urllib.request.install_opener(urllib.request.build_opener(*self._handlers)) with contextlib.closing(urllib.request.urlopen(url, context = self._context)) as fp: # Cache the file locally parsed_url = urllib.parse.urlparse(url) if parsed_url.scheme == 'file': # ZipExtFiles (files in zips) cannot seek, so must be cached in order to use and/or decompress curfile = urllib.request.urlopen(url, context = self._context) else: # TODO: find a way to check if we already have this file (look at http headers?) block_size = 1028 * 8 temp_filename = os.path.join(constants.CACHE_PATH, "data_" + hashlib.sha512(bytes(url, 'latin-1')).hexdigest()) if not os.path.exists(temp_filename): vollog.debug("Caching file at: {}".format(temp_filename)) try: content_length = fp.info().get('Content-Length', -1) except AttributeError: # If our fp doesn't have an info member, carry on gracefully content_length = -1 cache_file = open(temp_filename, "wb") count = 0 while True: block = fp.read(block_size) count += len(block) if not block: break if self._progress_callback: self._progress_callback(count / max(count, int(content_length)), "Reading file {}".format(url)) cache_file.write(block) cache_file.close() # Re-open the cache with a different mode curfile = open(temp_filename, mode = "rb") # Determine whether the file is a particular type of file, and if so, open it as such IMPORTED_MAGIC = False if HAS_MAGIC: while True: detected = None try: # Detect the content detected = magic.detect_from_fobj(curfile) IMPORTED_MAGIC = True # This is because python-magic and file provide a magic module # Only file's python has magic.detect_from_fobj except AttributeError: pass except: pass if detected: if detected.mime_type == 'application/x-xz': curfile = lzma.LZMAFile(curfile, mode) elif detected.mime_type == 'application/x-bzip2': curfile = bz2.BZ2File(curfile, mode) elif detected.mime_type == 'application/x-gzip': curfile = gzip.GzipFile(fileobj = curfile, mode = mode) else: break else: break # Read and rewind to ensure we're inside any compressed file layers curfile.read(1) curfile.seek(0) if not IMPORTED_MAGIC: # Somewhat of a hack, but prevents a hard dependency on the magic module url_path = parsed_url.path while True: if url_path.endswith(".xz"): curfile = lzma.LZMAFile(curfile, mode) elif url_path.endswith(".bz2"): curfile = bz2.BZ2File(curfile, mode) elif url_path.endswith(".gz"): curfile = gzip.GzipFile(fileobj = curfile, mode = mode) else: break url_path = ".".join(url_path.split(".")[:-1]) # Fallback in case the file doesn't exist if curfile is None: raise ValueError("URL does not reference an openable file") return curfile
[docs]class JarHandler(urllib.request.BaseHandler): """Handles the jar scheme for URIs. Reference used for the schema syntax: http://docs.netkernel.org/book/view/book:mod:reference/doc:layer1:schemes:jar Actual reference (found from https://www.w3.org/wiki/UriSchemes/jar) seemed not to return: http://developer.java.sun.com/developer/onlineTraining/protocolhandlers/ """
[docs] @staticmethod def default_open(req): """Handles the request if it's the jar scheme.""" if req.type == 'jar': subscheme, remainder = req.full_url.split(":")[1], ":".join(req.full_url.split(":")[2:]) if subscheme != 'file': vollog.log(constants.LOGLEVEL_VVV, "Unsupported jar subscheme {}".format(subscheme)) return None zipsplit = remainder.split("!") if len(zipsplit) != 2: vollog.log(constants.LOGLEVEL_VVV, "Path did not contain exactly one fragment indicator: {}".format(remainder)) return None zippath, filepath = zipsplit return zipfile.ZipFile(zippath).open(filepath) return None