From 7723a3987724d01e82fc78786b13bba927cb7ceb Mon Sep 17 00:00:00 2001 From: Philipp Oleynik <pholey@utu.fi> Date: Fri, 25 Feb 2022 14:31:43 +0200 Subject: [PATCH] =?UTF-8?q?PATE=20binary=20parser=20added=20(c)=20Petri=20?= =?UTF-8?q?Niemel=C3=A4.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pate_parser.py | 367 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 367 insertions(+) create mode 100644 pate_parser.py diff --git a/pate_parser.py b/pate_parser.py new file mode 100644 index 0000000..bed21e7 --- /dev/null +++ b/pate_parser.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 +""" + PATE Science File parser +""" +__author__ = "Petri Niemelä" +__credits__ = ["Petri Niemelä"] + +import struct +import datetime + +from typing import Any, Generator, List, NamedTuple, Optional, Sequence, Tuple, Type, Union + +import numpy as np + + +class PATEParseError(Exception): + """ Exception class for parsing errors """ + + +# Labels of the bins +LABELS = [ + 'elec_s', + 'prot_p1', 'prot_p2', 'prot_p3', 'prot_p4', 'prot_p5', 'prot_p6', 'prot_p7', 'prot_p8', 'prot_p9', 'prot_p10', + 'prot_s1', 'prot_s2', + 'trash1', 'trash2', 'trash3', + 'pad', + 'elec_p1', 'elec_p2', 'elec_p3', 'elec_p4', 'elec_p5', 'elec_p6', 'elec_p7' +] + +# Numpy datatype for binned and calibration data +science_dtype = np.dtype([(col, np.int32) for col in LABELS]) + +calibration_dtype = np.dtype([ + ('d1a', np.int32), + ('d1b', np.int32), + ('d1c', np.int32), + ('d2a', np.int32), + ('d2b', np.int32), + ('d3', np.int32), + ('ac1', np.int32), + ('ac2', np.int32), +]) + + +class HousekeepingPacket(NamedTuple): + """ PATE Housekeeping Pate """ + + bias_1: float # [V] + bias_2: float # [V] + bias_3: float # [V] + bias_4: float # [V] + bias_current: float # [µA] + mag_long_tube: float # [µT] + mag_short_tube: float # [µT] + reference_2v5: int + batt_voltage: float # [V] + batt_current: float # [mA] + current_1v5: float # [mA] + current_1v8: float # [mA] + current_3v3: float # [mA] + current_2v5: float # [mA] + current_n2v5: float # [mA] + temperature: float # [°C] + voltage_n2v5: float # [V] + voltage_2v5: float # [V] + voltage_3v3: float # [V] + voltage_1v8: float # [V] + voltage_1v5: float # [V] + timestamp: float + + @classmethod + def parse(cls, data: bytes) -> 'HousekeepingPacket': + raw = struct.unpack(">32H", data) + return cls( + bias_1=raw[0] * 2.5 / raw[7], + bias_2=raw[1] * 2.5 / raw[7], + bias_3=34.3 * raw[2] * 2.5 / raw[7], + bias_4=34.3 * raw[3] * 2.5 / raw[7], + bias_current=2500 * raw[4] * 2.5 / raw[7], + mag_long_tube=0.07 * (raw[5] - 2048), + mag_short_tube=0.07 * (raw[6] - 2048), + reference_2v5=raw[7], + batt_voltage=0.00242 * raw[17], + batt_current=0.517 * raw[9] + 4.4, + current_1v5=0.547 * raw[10], + current_1v8=0.547 * raw[11], + current_3v3=0.547 * raw[12], + current_2v5=0.547 * raw[13], + current_n2v5=-0.273 * raw[14], + temperature=-0.0932 * raw[15] + 158.03, + # NC = raw[16], + # NC = raw[17], + voltage_n2v5=-0.0011 * raw[18], + voltage_2v5=0.0011 * raw[19], + voltage_3v3=0.0011 * raw[20], + voltage_1v8=0.0011 * raw[21], + voltage_1v5=0.0011 * raw[22], + # NC = raw[23], + timestamp=((raw[24] << 16) | raw[25]) + raw[27] / 1000, + # Mag_rot_tube = raw[28], + # Mag_sun_tube = raw[29], + # Scrubbing data = raw[30], + # NA = raw[31], + ) + + +class SciencePacket(NamedTuple): + """ PATE Science Packet (0x41-0x52) """ + + segment: int + bins: np.array + + @classmethod + def parse(cls, segment: int, data: bytes) -> 'SciencePacket': + raw = np.array([int.from_bytes(data[i:i + 3], 'big') for i in range(0, len(data), 3)]) + raw = raw.reshape(-1, 24) + return cls(segment, raw) + + +class CalibrationPacket(NamedTuple): + """ PATE Calibration Packet (scope) (0x82) """ + + segment: int + channels: np.array + + @classmethod + def parse(cls, segment: int, data: bytes) -> 'CalibrationPacket': + bits = "".join([f"{val:08b}"[::-1] for val in data]) + raw = np.array([int(bits[i:i + 14][::-1], 2) for i in range(0, len(bits), 14)], dtype=np.int32) + channels = raw.reshape(-1, 8) + return cls(segment, channels) + + +class ConfigurationEntry(NamedTuple): + """ PATE Configuration Entry (0xF0) """ + + bias_sun_low: int + bias_sun_high: int + bias_rot_low: int + bias_rot_high: int + pc3_pin_e_high: int + pc3_pin_p_low: int + pc3_pin_p_high: int + pc4_pin_e_high: int + pc5_pin_e_low: int + pc5_pin_e_high: int + pc5_pin_p_low: int + pc5_pin_p_mid: int + pc5_pin_p_high: int + sun_gain_d1a: int + sun_gain_d1b: int + sun_gain_d1c: int + sun_gain_d2a: int + sun_gain_d2b: int + sun_gain_d3: int + rot_gain_d1a: int + rot_gain_d1b: int + rot_gain_d1c: int + rot_gain_d2a: int + rot_gain_d2b: int + rot_gain_d3: int + sun_pin_mult: int + sun_pcn_e_low: int + sun_pcn_e_high: int + sun_pcn_p1_low: int + sun_pcn_p1_high: int + sun_pcn_p2_low: int + sun_pcn_p2_high: int + rot_pin_mult: int + rot_pcn_e_low: int + rot_pcn_e_high: int + rot_pcn_p1_low: int + rot_pcn_p1_high: int + rot_pcn_p2_low: int + rot_pcn_p2_high: int + sun_threshold_d1a: int + sun_threshold_d1b: int + sun_threshold_d1c: int + sun_threshold_d2a: int + sun_threshold_d2b: int + sun_threshold_d3: int + sun_threshold_ac1: int + sun_threshold_ac2: int + rot_threshold_d1a: int + rot_threshold_d1b: int + rot_threshold_d1c: int + rot_threshold_d2a: int + rot_threshold_d2b: int + rot_threshold_d3: int + rot_threshold_ac1: int + rot_threshold_ac2: int + sector_count: int + sector_duration: int + + @classmethod + def parse(cls, data) -> 'ConfigurationEntry': + cfg = struct.unpack(">57I", data) + return cls(*cfg) + + +class CalibrationConfigurationEntry(NamedTuple): + """ PATE Calibration Run configuration (0xF1) """ + + mode: int + preroll: int + length: int + + @classmethod + def parse(cls, data: bytes) -> 'CalibrationConfigurationEntry': + parsed = struct.unpack(">3H", data) + return cls(parsed[0], parsed[1], parsed[2]) + + +class AttitudeEntry(NamedTuple): + """ Satellite Attitude Entry (0xF2) """ + timestamp: float + + # Quaternion in ECI + q: Tuple[float, float, float, float] + + # Angular rate + w: Tuple[float, float, float] + + @classmethod + def parse(cls, data: bytes) -> 'AttitudeEntry': + parsed = struct.unpack(">IH4d3d", data) + return AttitudeEntry( + timestamp=parsed[0] + parsed[1] / 1000, + q=np.array(parsed[2:6]), + w=np.array(parsed[6:9]) + ) + + +class SpinRateEntry(NamedTuple): + """ Satellite spin rate update entry (0xF3) """ + + spin_rate: float + + @classmethod + def parse(cls, data: bytes) -> 'SpinRateEntry': + parsed = struct.unpack(">d", data) + return cls(spin_rate=parsed[0]) + + +class MagnetometerEntry(NamedTuple): + """ Satellite's ADCS magnetometer Entry (0xF4) """ + + timestamp: float + field: Tuple[float, float, float] + + @classmethod + def parse(cls, data: bytes) -> 'MagnetometerEntry': + parsed = struct.unpack(">IH3d", data) + return cls( + timestamp=parsed[0] + parsed[1] / 1000, + field=parsed[2:5] + ) + + +class TimeUpdateEntry(NamedTuple): + """ Pate Time Update Entry (0xF5) """ + + old: float + new: float + + @classmethod + def parse(cls, data: bytes) -> 'TimeUpdateEntry': + parsed = struct.unpack(">IHIH", data) + return cls( + old=parsed[0] + parsed[1] / 1000, + new=parsed[2] + parsed[3] / 1000, + ) + + +def parse_timestamp(timestamp: int) -> datetime.datetime: + """ Convert Unix timesamp to datetime object """ + return datetime.datetime.utcfromtimestamp(timestamp).replace(tzinfo=datetime.timezone.utc) + + +def read_chunks(filepath: str) -> Generator[Tuple[int, bytes], None, None]: + """ Read binary chunks from the file. """ + with open(filepath, "rb") as f: + while True: + + # Read header + header = f.read(2) + if len(header) == 0: # File ended? + break + if len(header) != 2: + raise PATEParseError("Failed to read header! Unexpected end of file.") + + # Parse header + data_len, cmd = struct.unpack("BB", header) + + # Read data + data = f.read(data_len) + if len(data) != data_len: + raise PATEParseError("Failed to read data! Unexpected end of file.") + + yield cmd, data + + +PacketTypes = Union[HousekeepingPacket, SciencePacket, CalibrationPacket, ConfigurationEntry, \ + CalibrationConfigurationEntry, AttitudeEntry, SpinRateEntry, MagnetometerEntry, TimeUpdateEntry] + +PacketTypeTypes = Union[Type[HousekeepingPacket], Type[SciencePacket], Type[CalibrationPacket], Type[ConfigurationEntry], \ + Type[CalibrationConfigurationEntry], Type[AttitudeEntry], Type[SpinRateEntry], Type[MagnetometerEntry], Type[TimeUpdateEntry]] + + +def read_parsed(filepath: str, types: Optional[Sequence[PacketTypeTypes]] = None) -> Generator[PacketTypes, None, None]: + """ + Read parsed data from the file. + + Args: + filepath: Filepath to file to be read. + types: A list or tuple of wanted Packet types. If None all types are returned. + + Returns: + A generator object which yields different PATE packet types. + """ + + def wanted(packet_type: PacketTypeTypes) -> bool: + return (types is None) or (packet_type in types) + + for cmd, data in read_chunks(filepath): + if cmd == 0x31: + if wanted(HousekeepingPacket): + yield HousekeepingPacket.parse(data) + elif 0x41 <= cmd <= 0x52: + if wanted(SciencePacket): + yield SciencePacket.parse(cmd - 0x41, data) + elif 0x82 <= cmd <= 0xC2: + if wanted(CalibrationPacket): + yield CalibrationPacket.parse(cmd - 0x82, data) + elif cmd == 0xF0: + if wanted(ConfigurationEntry): + yield ConfigurationEntry.parse(data) + elif cmd == 0xF1: + if wanted(CalibrationConfigurationEntry): + yield CalibrationConfigurationEntry.parse(data) + elif cmd == 0xF2: + if wanted(AttitudeEntry): + yield AttitudeEntry.parse(data) + elif cmd == 0xF3: + if wanted(SpinRateEntry): + yield SpinRateEntry.parse(data) + elif cmd == 0xF4: + if wanted(MagnetometerEntry): + yield MagnetometerEntry.parse(data) + elif cmd == 0xF5: + if wanted(TimeUpdateEntry): + yield TimeUpdateEntry.parse(data) + + else: + raise PATEParseError(f"Unknown data type {cmd}") + + +if __name__ == "__main__": + pass + # import argparse + # + # parser = argparse.ArgumentParser(description='PATE Science Parser') + # parser.add_argument('filename', type=str) + # args = parser.parse_args() + # + # for packet in read_parsed(args.filename): + # print(packet) -- GitLab