354 lines
12 KiB
Python
354 lines
12 KiB
Python
import re
|
|
from PyQt5.QtGui import QPixmap
|
|
from PyQt5.QtCore import pyqtSlot, pyqtSignal, QObject
|
|
from threads import (BaseDownloadThread,
|
|
UpdateSpaceWeatherThread,
|
|
ThreadStatus,
|
|
UpdateForecastThread)
|
|
from constants import Constants
|
|
from switchable_label import MultiColorSwitchableLabel
|
|
from utilities import safe_cast
|
|
|
|
|
|
class _BaseWeatherData(QObject):
|
|
"""Base class for the weather data. Extends QObject."""
|
|
|
|
update_complete = pyqtSignal(bool)
|
|
|
|
def __init__(self):
|
|
"""Create a BaseDownloadThread object."""
|
|
super().__init__()
|
|
self._update_thread = BaseDownloadThread()
|
|
|
|
@property
|
|
def is_updating(self):
|
|
"""Return whether the thread is running."""
|
|
return self._update_thread.isRunning()
|
|
|
|
def update(self):
|
|
"""Start the thread."""
|
|
self._update_thread.start()
|
|
|
|
def _parse_data(self):
|
|
"""Dummy function. Must be overrided by subclasses."""
|
|
pass
|
|
|
|
@pyqtSlot()
|
|
def _parse_and_emit_signal(self):
|
|
"""Parse the data and emit an 'update_complete' signal.
|
|
|
|
If the download was not successful, do not parse the data.
|
|
The 'update_complete' signal propagates the thread status up to the
|
|
calling slot."""
|
|
status_ok = False
|
|
if self._update_thread.status is ThreadStatus.OK:
|
|
status_ok = True
|
|
self._parse_data()
|
|
self.update_complete.emit(status_ok)
|
|
|
|
def _double_split(self, string):
|
|
"""Given a string, return a list of lists.
|
|
|
|
First split on each line. Then split each line on whitespaces."""
|
|
return [i.split() for i in string.splitlines()]
|
|
|
|
def shutdown_thread(self):
|
|
"""Terminate the download thread."""
|
|
self._update_thread.terminate()
|
|
self._update_thread.wait()
|
|
|
|
|
|
class SpaceWeatherData(_BaseWeatherData):
|
|
"""Space weather class. Extends _BaseWeatherData."""
|
|
|
|
def __init__(self):
|
|
"""Set all attributes and connect the thread to _parse_and_emit_signal."""
|
|
super().__init__()
|
|
self.xray = ''
|
|
self.prot_el = ''
|
|
self.ak_index = ''
|
|
self.sgas = ''
|
|
self.geo_storm = ''
|
|
self.images = [
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap()
|
|
]
|
|
self._update_thread = UpdateSpaceWeatherThread(self)
|
|
self._update_thread.finished.connect(self._parse_and_emit_signal)
|
|
|
|
def _parse_data(self):
|
|
"""Override _BaseWeatherData._parse_data.
|
|
|
|
Set all the data."""
|
|
self.xray = self._double_split(self.xray)
|
|
self.prot_el = self._double_split(self.prot_el)
|
|
self.ak_index = self._double_split(self.ak_index)
|
|
self.sgas = self._double_split(self.sgas)
|
|
self.geo_storm = self._double_split(self.geo_storm)
|
|
|
|
def remove_data(self):
|
|
"""Remove the reference to all the data."""
|
|
self.xray = ''
|
|
self.prot_el = ''
|
|
self.ak_index = ''
|
|
self.sgas = ''
|
|
self.geo_storm = ''
|
|
self.images = [
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap(),
|
|
QPixmap()
|
|
]
|
|
|
|
|
|
def _make_labels_table(forecast, probabilities, rows):
|
|
"""Organize all the arguments to feed _get_lbl_value."""
|
|
def get_first_split(x):
|
|
return x.split("/")[0]
|
|
|
|
def get_second_split(x):
|
|
return x.split("/")[1]
|
|
|
|
def get_third_split(x):
|
|
return x.split("/")[2]
|
|
|
|
solar_row = rows["solar_row"]
|
|
event_row = rows["event_row"]
|
|
rb_now_row = rows["rb_now_row"]
|
|
ga_now_row = rows["ga_now_row"]
|
|
kp_index_row = rows["kp_index_row"]
|
|
return [
|
|
[
|
|
[forecast, solar_row, 3, None],
|
|
[probabilities, event_row + 1, 2, get_first_split],
|
|
[probabilities, event_row + 2, 2, get_first_split],
|
|
[probabilities, event_row + 3, 1, get_first_split],
|
|
[forecast, rb_now_row, 1, None],
|
|
[forecast, rb_now_row + 1, 3, None],
|
|
[probabilities, ga_now_row + 2, 1, get_first_split],
|
|
[probabilities, ga_now_row + 3, 2, get_first_split],
|
|
[probabilities, ga_now_row + 4, 2, get_first_split],
|
|
[probabilities, ga_now_row + 6, 1, get_first_split],
|
|
[probabilities, ga_now_row + 7, 2, get_first_split],
|
|
[probabilities, ga_now_row + 8, 2, get_first_split],
|
|
[forecast, kp_index_row + 3, 1, None],
|
|
[forecast, kp_index_row + 4, 1, None],
|
|
[forecast, kp_index_row + 5, 1, None],
|
|
[forecast, kp_index_row + 6, 1, None],
|
|
[forecast, kp_index_row + 7, 1, None],
|
|
[forecast, kp_index_row + 8, 1, None],
|
|
[forecast, kp_index_row + 9, 1, None],
|
|
[forecast, kp_index_row + 10, 1, None]
|
|
],
|
|
[
|
|
[forecast, solar_row, 4, None],
|
|
[probabilities, event_row + 1, 2, get_second_split],
|
|
[probabilities, event_row + 2, 2, get_second_split],
|
|
[probabilities, event_row + 3, 1, get_second_split],
|
|
[forecast, rb_now_row, 2, None],
|
|
[forecast, rb_now_row + 1, 4, None],
|
|
[probabilities, ga_now_row + 2, 1, get_second_split],
|
|
[probabilities, ga_now_row + 3, 2, get_second_split],
|
|
[probabilities, ga_now_row + 4, 2, get_second_split],
|
|
[probabilities, ga_now_row + 6, 1, get_second_split],
|
|
[probabilities, ga_now_row + 7, 2, get_second_split],
|
|
[probabilities, ga_now_row + 8, 2, get_second_split],
|
|
[forecast, kp_index_row + 3, 2, None],
|
|
[forecast, kp_index_row + 4, 2, None],
|
|
[forecast, kp_index_row + 5, 2, None],
|
|
[forecast, kp_index_row + 6, 2, None],
|
|
[forecast, kp_index_row + 7, 2, None],
|
|
[forecast, kp_index_row + 8, 2, None],
|
|
[forecast, kp_index_row + 9, 2, None],
|
|
[forecast, kp_index_row + 10, 2, None]
|
|
],
|
|
[
|
|
[forecast, solar_row, 5, None],
|
|
[probabilities, event_row + 1, 2, get_third_split],
|
|
[probabilities, event_row + 2, 2, get_third_split],
|
|
[probabilities, event_row + 3, 1, get_third_split],
|
|
[forecast, rb_now_row, 3, None],
|
|
[forecast, rb_now_row + 1, 5, None],
|
|
[probabilities, ga_now_row + 2, 1, get_third_split],
|
|
[probabilities, ga_now_row + 3, 2, get_third_split],
|
|
[probabilities, ga_now_row + 4, 2, get_third_split],
|
|
[probabilities, ga_now_row + 6, 1, get_third_split],
|
|
[probabilities, ga_now_row + 7, 2, get_third_split],
|
|
[probabilities, ga_now_row + 8, 2, get_third_split],
|
|
[forecast, kp_index_row + 3, 3, None],
|
|
[forecast, kp_index_row + 4, 3, None],
|
|
[forecast, kp_index_row + 5, 3, None],
|
|
[forecast, kp_index_row + 6, 3, None],
|
|
[forecast, kp_index_row + 7, 3, None],
|
|
[forecast, kp_index_row + 8, 3, None],
|
|
[forecast, kp_index_row + 9, 3, None],
|
|
[forecast, kp_index_row + 10, 3, None]
|
|
]
|
|
]
|
|
|
|
|
|
def _get_lbl_value(data, row, col, f=None):
|
|
"""Return the well-formatted string-value of the label."""
|
|
val = data[row][col]
|
|
if f is not None:
|
|
val = f(val)
|
|
val = val.rstrip('%')
|
|
if len(val) > 1:
|
|
val = val.lstrip('0')
|
|
return val
|
|
|
|
|
|
class ForecastData(_BaseWeatherData):
|
|
"""3-day forecast class. Extends _BaseWeatherData."""
|
|
|
|
ROW_KEYWORDS = {
|
|
"solar_row": "S1 or greater",
|
|
"event_row": "III. Event probabilities",
|
|
"rb_now_row": "R1-R2",
|
|
"ga_now_row": "Geomagnetic Activity Probabilities",
|
|
"kp_index_row": "NOAA Kp index breakdown"
|
|
}
|
|
|
|
LABELS_PER_COLUMN = 20
|
|
|
|
def __init__(self, owner):
|
|
"""Initialize all attributes and connect the thread to _parse_and_emit_signal."""
|
|
super().__init__()
|
|
self.forecast = []
|
|
self.probabilities = []
|
|
self._update_thread = UpdateForecastThread(self)
|
|
self._update_thread.finished.connect(self._parse_and_emit_signal)
|
|
self._today_lbl = owner.today_lbl
|
|
self._today_p1_lbl = owner.today_p1_lbl
|
|
self._today_p2_lbl = owner.today_p2_lbl
|
|
today_lbls = []
|
|
today_p1_lbls = []
|
|
today_p2_lbls = []
|
|
flags = ['', 'p1_', 'p2_']
|
|
for flag in flags:
|
|
title_lbl = getattr(self, "_today_" + flag + "lbl")
|
|
title_lbl.setText("-")
|
|
for index in range(self.LABELS_PER_COLUMN):
|
|
label = getattr(
|
|
owner,
|
|
"forecast_today_" + flag + str(index) + "_lbl"
|
|
)
|
|
label.setText(Constants.UNKNOWN)
|
|
if flag == flags[0]:
|
|
today_lbls.append(label)
|
|
if flag == flags[1]:
|
|
today_p1_lbls.append(label)
|
|
if flag == flags[2]:
|
|
today_p2_lbls.append(label)
|
|
|
|
self._all_lbls = [today_lbls, today_p1_lbls, today_p2_lbls]
|
|
|
|
def _parse_data(self):
|
|
"""Override _BaseWeatherData._parse_data.
|
|
|
|
Set all the relevant data."""
|
|
# Remove possible '(G\d)' from the kp_index table
|
|
self.forecast = re.sub(
|
|
'\(G\d\)', lambda obj: '', self.forecast
|
|
)
|
|
self.forecast = self.forecast.splitlines()
|
|
self.probabilities = re.sub(
|
|
'\(G\d\)', lambda obj: '', self.probabilities
|
|
)
|
|
self.probabilities = self.probabilities.splitlines()
|
|
|
|
def _split_lists(self):
|
|
"""Split the elements of forecast and probabilities."""
|
|
return [i.split() for i in self.forecast], [i.split() for i in self.probabilities]
|
|
|
|
def _find_row_with(self, data, text):
|
|
"""Given a list of strings, return the index of the first string containing the target text."""
|
|
for i, row in enumerate(data):
|
|
if text in row:
|
|
return i
|
|
return None
|
|
|
|
def _get_rows(self):
|
|
"""Get all the rows needed for updating the screen.
|
|
|
|
Raise an exception if something goes wrong."""
|
|
|
|
rows = {}
|
|
rows["solar_row"] = self._find_row_with(
|
|
self.forecast,
|
|
self.ROW_KEYWORDS["solar_row"]
|
|
)
|
|
rows["event_row"] = self._find_row_with(
|
|
self.probabilities,
|
|
self.ROW_KEYWORDS["event_row"]
|
|
)
|
|
rows["rb_now_row"] = self._find_row_with(
|
|
self.forecast,
|
|
self.ROW_KEYWORDS["rb_now_row"]
|
|
)
|
|
rows["ga_now_row"] = self._find_row_with(
|
|
self.probabilities,
|
|
self.ROW_KEYWORDS["ga_now_row"]
|
|
)
|
|
rows["kp_index_row"] = self._find_row_with(
|
|
self.forecast,
|
|
self.ROW_KEYWORDS["kp_index_row"]
|
|
)
|
|
|
|
if any(row is None for row in rows.values()):
|
|
raise Exception('Missing Rows')
|
|
else:
|
|
return rows
|
|
|
|
def _set_dates(self, forecast, solar_row):
|
|
"""Set the date labels."""
|
|
month = forecast[solar_row - 1][0]
|
|
today = forecast[solar_row - 1][1]
|
|
today_p1 = forecast[solar_row - 1][3]
|
|
today_p2 = forecast[solar_row - 1][5]
|
|
self._today_lbl.setText(month + ' ' + today)
|
|
self._today_p1_lbl.setText(month + ' ' + today_p1)
|
|
self._today_p2_lbl.setText(month + ' ' + today_p2)
|
|
|
|
def _set_labels_values(self, labels_table):
|
|
"""Set all the labels values."""
|
|
for lbl_list, table in zip(self._all_lbls, labels_table):
|
|
for lbl, row in zip(lbl_list, table):
|
|
lbl.switch_off()
|
|
value = _get_lbl_value(*row)
|
|
lbl.level = safe_cast(value, int)
|
|
if not isinstance(lbl, MultiColorSwitchableLabel):
|
|
value += '%'
|
|
lbl.setText(value)
|
|
lbl.switch_on()
|
|
|
|
def update_all_labels(self):
|
|
"""Update all the labels values.
|
|
|
|
If an exception is raised in the process, do nothing."""
|
|
try:
|
|
rows = self._get_rows()
|
|
forecast, probabilities = self._split_lists()
|
|
labels_table = _make_labels_table(forecast, probabilities, rows)
|
|
self._set_dates(forecast, rows["solar_row"])
|
|
self._set_labels_values(labels_table)
|
|
except Exception:
|
|
pass
|
|
|
|
def remove_data(self):
|
|
"""Remove the reference to the downloaded data."""
|
|
self.forecast = []
|
|
self.probabilities = []
|