Source code for mqtt_pwn.exploits.owntracks

from collections import defaultdict
import json
from json import JSONDecodeError
from prettytable import PrettyTable

from mqtt_pwn.models.message import Message
from mqtt_pwn.models.scan import Scan
from mqtt_pwn.models.topic import Topic
from mqtt_pwn.utils import drop_none

[docs]class OwnTracksExploit(object): """Represents the owntracks exploit""" def __init__(self, scan_id): self.scan_id = scan_id self._data = {} def _parse_messages(self): """Parses the messages according to the owntracks message format""" messages = Message \ .select(Message, Topic) \ .join(Scan, on=( == Message.scan)) \ .join(Topic, on=( == Message.topic)) \ .where('owntracks/'), == self.scan_id ) data = defaultdict(set) for m in messages: name = OwnTracksExploit.label_to_name( body = m.body if name is None: continue data[name].add(body) for name, body in data.items(): converted = [ OwnTracksExploit.body_to_json(b) for b in body ] self._data[name] = drop_none(converted)
[docs] @staticmethod def label_to_name(label): """Converts a label to the name """ _, _, t = label.partition('owntracks/') t = t.split('/') if len(t) != 2: return None return t[0], t[1]
[docs] @staticmethod def body_to_json(body): """Converts the body of a message to json""" try: return json.loads(body) except JSONDecodeError: return None
[docs] def create_urls_table(self): """Creates the URLs table from the messages""" if len(self._data) == 0: self._parse_messages() t = PrettyTable(field_names=[ 'User', 'Device', '# Coords' ]) t.align['URL'] = "l" for row in self._create_all_urls(): t.add_row(row) return t
def _create_google_maps_url(self, coordinations: list): """Creates the google maps URL from the messages""" base_url = "" parts = [base_url] for coord in coordinations: lat = coord.get('lat', 0) lon = coord.get('lon', 0) parts.append(f'{lat},{lon}') return '/'.join(parts), len(parts) - 1 def _create_all_urls(self): """Creates all the URLs""" for (user, device), coords in self._data.items(): url, num_of_coords = self._create_google_maps_url(coords) yield user, device, num_of_coords
[docs] def google_maps_url(self, user=None, device=None): """The public method to create a google maps URL""" if len(self._data) == 0: self._parse_messages() if not user and not device: return self._create_all_urls() identifier = (user, device) coords = self._data.get(identifier) if not coords: return None url, num_of_coords = self._create_google_maps_url(coords) return url