# This file is Copyright 2019 Volatility Foundation and licensed under the Volatility Software License 1.0
# which is available at https://www.volatilityfoundation.org/license/vsl-v1.0
#
import logging
from volatility.framework import exceptions, renderers
from volatility.framework.automagic import mac
from volatility.framework.configuration import requirements
from volatility.framework.interfaces import plugins
from volatility.framework.objects import utility
from volatility.framework.renderers import format_hints
from volatility.plugins.mac import tasks
vollog = logging.getLogger(__name__)
[docs]class Netstat(plugins.PluginInterface):
"""Lists all network connections for all processes."""
[docs] @classmethod
def get_requirements(cls):
return [
requirements.TranslationLayerRequirement(name = 'primary',
description = 'Kernel Address Space',
architectures = ["Intel32", "Intel64"]),
requirements.SymbolTableRequirement(name = "darwin", description = "Mac Kernel"),
requirements.PluginRequirement(name = 'tasks', plugin = tasks.Tasks, version = (1, 0, 0))
]
def _generator(self, tasks):
for task in tasks:
task_name = utility.array_to_string(task.p_comm)
pid = task.p_pid
for filp, _, _ in mac.MacUtilities.files_descriptors_for_process(self.config, self.context, task):
try:
ftype = filp.f_fglob.get_fg_type()
except exceptions.InvalidAddressException:
continue
if ftype != 'DTYPE_SOCKET':
continue
try:
socket = filp.f_fglob.fg_data.dereference().cast("socket")
except exceptions.InvalidAddressException:
continue
family = socket.get_family()
if family == 1:
try:
upcb = socket.so_pcb.dereference().cast("unpcb")
path = utility.array_to_string(upcb.unp_addr.sun_path)
except exceptions.InvalidAddressException:
continue
yield (0, (format_hints.Hex(socket.vol.offset), "UNIX", path, 0, "", 0, "",
"{}/{:d}".format(task_name, pid)))
elif family in [2, 30]:
state = socket.get_state()
proto = socket.get_protocol_as_string()
vals = socket.get_converted_connection_info()
if vals:
(lip, lport, rip, rport) = vals
yield (0, (format_hints.Hex(socket.vol.offset), proto, lip, lport, rip, rport, state,
"{}/{:d}".format(task_name, pid)))
[docs] def run(self):
# mac.MacUtilities.aslr_mask_symbol_table(self.config, self.context)
filter_func = tasks.Tasks.create_pid_filter([self.config.get('pid', None)])
return renderers.TreeGrid([("Offset", format_hints.Hex), ("Proto", str), ("Local IP", str), ("Local Port", int),
("Remote IP", str), ("Remote Port", int), ("State", str), ("Process", str)],
self._generator(
tasks.Tasks.list_tasks(self.context,
self.config['primary'],
self.config['darwin'],
filter_func = filter_func)))