Skip to content
Snippets Groups Projects
Commit 7723a398 authored by Philipp Oleynik's avatar Philipp Oleynik
Browse files

PATE binary parser added (c) Petri Niemelä.

parent a8ba9a38
Branches
No related tags found
No related merge requests found
#!/usr/bin/env python3
"""
PATE Science File parser
"""
__author__ = "Petri Niemelä"
__credits__ = ["Petri Niemelä"]
import struct
import datetime
from typing import Any, Generator, List, NamedTuple, Optional, Sequence, Tuple, Type, Union
import numpy as np
class PATEParseError(Exception):
""" Exception class for parsing errors """
# Labels of the bins
LABELS = [
'elec_s',
'prot_p1', 'prot_p2', 'prot_p3', 'prot_p4', 'prot_p5', 'prot_p6', 'prot_p7', 'prot_p8', 'prot_p9', 'prot_p10',
'prot_s1', 'prot_s2',
'trash1', 'trash2', 'trash3',
'pad',
'elec_p1', 'elec_p2', 'elec_p3', 'elec_p4', 'elec_p5', 'elec_p6', 'elec_p7'
]
# Numpy datatype for binned and calibration data
science_dtype = np.dtype([(col, np.int32) for col in LABELS])
calibration_dtype = np.dtype([
('d1a', np.int32),
('d1b', np.int32),
('d1c', np.int32),
('d2a', np.int32),
('d2b', np.int32),
('d3', np.int32),
('ac1', np.int32),
('ac2', np.int32),
])
class HousekeepingPacket(NamedTuple):
""" PATE Housekeeping Pate """
bias_1: float # [V]
bias_2: float # [V]
bias_3: float # [V]
bias_4: float # [V]
bias_current: float # [µA]
mag_long_tube: float # [µT]
mag_short_tube: float # [µT]
reference_2v5: int
batt_voltage: float # [V]
batt_current: float # [mA]
current_1v5: float # [mA]
current_1v8: float # [mA]
current_3v3: float # [mA]
current_2v5: float # [mA]
current_n2v5: float # [mA]
temperature: float # [°C]
voltage_n2v5: float # [V]
voltage_2v5: float # [V]
voltage_3v3: float # [V]
voltage_1v8: float # [V]
voltage_1v5: float # [V]
timestamp: float
@classmethod
def parse(cls, data: bytes) -> 'HousekeepingPacket':
raw = struct.unpack(">32H", data)
return cls(
bias_1=raw[0] * 2.5 / raw[7],
bias_2=raw[1] * 2.5 / raw[7],
bias_3=34.3 * raw[2] * 2.5 / raw[7],
bias_4=34.3 * raw[3] * 2.5 / raw[7],
bias_current=2500 * raw[4] * 2.5 / raw[7],
mag_long_tube=0.07 * (raw[5] - 2048),
mag_short_tube=0.07 * (raw[6] - 2048),
reference_2v5=raw[7],
batt_voltage=0.00242 * raw[17],
batt_current=0.517 * raw[9] + 4.4,
current_1v5=0.547 * raw[10],
current_1v8=0.547 * raw[11],
current_3v3=0.547 * raw[12],
current_2v5=0.547 * raw[13],
current_n2v5=-0.273 * raw[14],
temperature=-0.0932 * raw[15] + 158.03,
# NC = raw[16],
# NC = raw[17],
voltage_n2v5=-0.0011 * raw[18],
voltage_2v5=0.0011 * raw[19],
voltage_3v3=0.0011 * raw[20],
voltage_1v8=0.0011 * raw[21],
voltage_1v5=0.0011 * raw[22],
# NC = raw[23],
timestamp=((raw[24] << 16) | raw[25]) + raw[27] / 1000,
# Mag_rot_tube = raw[28],
# Mag_sun_tube = raw[29],
# Scrubbing data = raw[30],
# NA = raw[31],
)
class SciencePacket(NamedTuple):
""" PATE Science Packet (0x41-0x52) """
segment: int
bins: np.array
@classmethod
def parse(cls, segment: int, data: bytes) -> 'SciencePacket':
raw = np.array([int.from_bytes(data[i:i + 3], 'big') for i in range(0, len(data), 3)])
raw = raw.reshape(-1, 24)
return cls(segment, raw)
class CalibrationPacket(NamedTuple):
""" PATE Calibration Packet (scope) (0x82) """
segment: int
channels: np.array
@classmethod
def parse(cls, segment: int, data: bytes) -> 'CalibrationPacket':
bits = "".join([f"{val:08b}"[::-1] for val in data])
raw = np.array([int(bits[i:i + 14][::-1], 2) for i in range(0, len(bits), 14)], dtype=np.int32)
channels = raw.reshape(-1, 8)
return cls(segment, channels)
class ConfigurationEntry(NamedTuple):
""" PATE Configuration Entry (0xF0) """
bias_sun_low: int
bias_sun_high: int
bias_rot_low: int
bias_rot_high: int
pc3_pin_e_high: int
pc3_pin_p_low: int
pc3_pin_p_high: int
pc4_pin_e_high: int
pc5_pin_e_low: int
pc5_pin_e_high: int
pc5_pin_p_low: int
pc5_pin_p_mid: int
pc5_pin_p_high: int
sun_gain_d1a: int
sun_gain_d1b: int
sun_gain_d1c: int
sun_gain_d2a: int
sun_gain_d2b: int
sun_gain_d3: int
rot_gain_d1a: int
rot_gain_d1b: int
rot_gain_d1c: int
rot_gain_d2a: int
rot_gain_d2b: int
rot_gain_d3: int
sun_pin_mult: int
sun_pcn_e_low: int
sun_pcn_e_high: int
sun_pcn_p1_low: int
sun_pcn_p1_high: int
sun_pcn_p2_low: int
sun_pcn_p2_high: int
rot_pin_mult: int
rot_pcn_e_low: int
rot_pcn_e_high: int
rot_pcn_p1_low: int
rot_pcn_p1_high: int
rot_pcn_p2_low: int
rot_pcn_p2_high: int
sun_threshold_d1a: int
sun_threshold_d1b: int
sun_threshold_d1c: int
sun_threshold_d2a: int
sun_threshold_d2b: int
sun_threshold_d3: int
sun_threshold_ac1: int
sun_threshold_ac2: int
rot_threshold_d1a: int
rot_threshold_d1b: int
rot_threshold_d1c: int
rot_threshold_d2a: int
rot_threshold_d2b: int
rot_threshold_d3: int
rot_threshold_ac1: int
rot_threshold_ac2: int
sector_count: int
sector_duration: int
@classmethod
def parse(cls, data) -> 'ConfigurationEntry':
cfg = struct.unpack(">57I", data)
return cls(*cfg)
class CalibrationConfigurationEntry(NamedTuple):
""" PATE Calibration Run configuration (0xF1) """
mode: int
preroll: int
length: int
@classmethod
def parse(cls, data: bytes) -> 'CalibrationConfigurationEntry':
parsed = struct.unpack(">3H", data)
return cls(parsed[0], parsed[1], parsed[2])
class AttitudeEntry(NamedTuple):
""" Satellite Attitude Entry (0xF2) """
timestamp: float
# Quaternion in ECI
q: Tuple[float, float, float, float]
# Angular rate
w: Tuple[float, float, float]
@classmethod
def parse(cls, data: bytes) -> 'AttitudeEntry':
parsed = struct.unpack(">IH4d3d", data)
return AttitudeEntry(
timestamp=parsed[0] + parsed[1] / 1000,
q=np.array(parsed[2:6]),
w=np.array(parsed[6:9])
)
class SpinRateEntry(NamedTuple):
""" Satellite spin rate update entry (0xF3) """
spin_rate: float
@classmethod
def parse(cls, data: bytes) -> 'SpinRateEntry':
parsed = struct.unpack(">d", data)
return cls(spin_rate=parsed[0])
class MagnetometerEntry(NamedTuple):
""" Satellite's ADCS magnetometer Entry (0xF4) """
timestamp: float
field: Tuple[float, float, float]
@classmethod
def parse(cls, data: bytes) -> 'MagnetometerEntry':
parsed = struct.unpack(">IH3d", data)
return cls(
timestamp=parsed[0] + parsed[1] / 1000,
field=parsed[2:5]
)
class TimeUpdateEntry(NamedTuple):
""" Pate Time Update Entry (0xF5) """
old: float
new: float
@classmethod
def parse(cls, data: bytes) -> 'TimeUpdateEntry':
parsed = struct.unpack(">IHIH", data)
return cls(
old=parsed[0] + parsed[1] / 1000,
new=parsed[2] + parsed[3] / 1000,
)
def parse_timestamp(timestamp: int) -> datetime.datetime:
""" Convert Unix timesamp to datetime object """
return datetime.datetime.utcfromtimestamp(timestamp).replace(tzinfo=datetime.timezone.utc)
def read_chunks(filepath: str) -> Generator[Tuple[int, bytes], None, None]:
""" Read binary chunks from the file. """
with open(filepath, "rb") as f:
while True:
# Read header
header = f.read(2)
if len(header) == 0: # File ended?
break
if len(header) != 2:
raise PATEParseError("Failed to read header! Unexpected end of file.")
# Parse header
data_len, cmd = struct.unpack("BB", header)
# Read data
data = f.read(data_len)
if len(data) != data_len:
raise PATEParseError("Failed to read data! Unexpected end of file.")
yield cmd, data
PacketTypes = Union[HousekeepingPacket, SciencePacket, CalibrationPacket, ConfigurationEntry, \
CalibrationConfigurationEntry, AttitudeEntry, SpinRateEntry, MagnetometerEntry, TimeUpdateEntry]
PacketTypeTypes = Union[Type[HousekeepingPacket], Type[SciencePacket], Type[CalibrationPacket], Type[ConfigurationEntry], \
Type[CalibrationConfigurationEntry], Type[AttitudeEntry], Type[SpinRateEntry], Type[MagnetometerEntry], Type[TimeUpdateEntry]]
def read_parsed(filepath: str, types: Optional[Sequence[PacketTypeTypes]] = None) -> Generator[PacketTypes, None, None]:
"""
Read parsed data from the file.
Args:
filepath: Filepath to file to be read.
types: A list or tuple of wanted Packet types. If None all types are returned.
Returns:
A generator object which yields different PATE packet types.
"""
def wanted(packet_type: PacketTypeTypes) -> bool:
return (types is None) or (packet_type in types)
for cmd, data in read_chunks(filepath):
if cmd == 0x31:
if wanted(HousekeepingPacket):
yield HousekeepingPacket.parse(data)
elif 0x41 <= cmd <= 0x52:
if wanted(SciencePacket):
yield SciencePacket.parse(cmd - 0x41, data)
elif 0x82 <= cmd <= 0xC2:
if wanted(CalibrationPacket):
yield CalibrationPacket.parse(cmd - 0x82, data)
elif cmd == 0xF0:
if wanted(ConfigurationEntry):
yield ConfigurationEntry.parse(data)
elif cmd == 0xF1:
if wanted(CalibrationConfigurationEntry):
yield CalibrationConfigurationEntry.parse(data)
elif cmd == 0xF2:
if wanted(AttitudeEntry):
yield AttitudeEntry.parse(data)
elif cmd == 0xF3:
if wanted(SpinRateEntry):
yield SpinRateEntry.parse(data)
elif cmd == 0xF4:
if wanted(MagnetometerEntry):
yield MagnetometerEntry.parse(data)
elif cmd == 0xF5:
if wanted(TimeUpdateEntry):
yield TimeUpdateEntry.parse(data)
else:
raise PATEParseError(f"Unknown data type {cmd}")
if __name__ == "__main__":
pass
# import argparse
#
# parser = argparse.ArgumentParser(description='PATE Science Parser')
# parser.add_argument('filename', type=str)
# args = parser.parse_args()
#
# for packet in read_parsed(args.filename):
# print(packet)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment