Deploy script for Windows

This commit is contained in:
Marco
2019-06-25 23:20:09 +02:00
parent 8b77d3a289
commit 8a3d5f811e
24 changed files with 3934 additions and 21034 deletions

2
.flake8 Normal file
View File

@@ -0,0 +1,2 @@
[flake8]
ignore = E221, E501, W605, W504

2
.gitignore vendored
View File

@@ -9,5 +9,5 @@ themes/.current_theme
launch.bat
designer.bat
*.sh
.vscode/
default_pics/
.vscode/

View File

@@ -16,10 +16,11 @@ from PyQt5.QtWidgets import (QMainWindow,
QSplashScreen,
QTreeWidgetItem,)
from PyQt5.QtGui import QPixmap
from PyQt5 import uic, QtGui
from PyQt5 import uic
from PyQt5.QtCore import (QFileInfo,
Qt,
pyqtSlot,)
pyqtSlot,
QRect,)
from audio_player import AudioPlayer
from weatherdata import SpaceWeatherData, ForecastData
@@ -31,7 +32,8 @@ from constants import (Constants,
Database,
ChecksumWhat,
Messages,
Signal,)
Signal,
MainTabs,)
from themesmanager import ThemeManager
from utilities import (checksum_ok,
uncheck_and_emit,
@@ -41,7 +43,8 @@ from utilities import (checksum_ok,
is_undef_freq,
is_undef_band,
format_numbers,
resource_path,)
resource_path,
safe_cast)
# import default_imgs_rc
@@ -51,12 +54,13 @@ Ui_MainWindow, _ = uic.loadUiType(qt_creator_file)
class Artemis(QMainWindow, Ui_MainWindow):
"""Main application class."""
def __init__(self):
"""Set all connections of the application."""
super().__init__()
self.setupUi(self)
self.set_initial_size()
self.setWindowIcon(QtGui.QIcon(":/icon/default_pics/Artemis3.500px.png"))
self.closing = False
self.download_window = DownloadWindow()
self.download_window.complete.connect(self.show_downloaded_signals)
@@ -482,10 +486,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
# Left list widget and search bar.
self.search_bar.textChanged.connect(self.display_signals)
self.result_list.currentItemChanged.connect(self.display_specs)
self.result_list.itemDoubleClicked.connect(
lambda: self.main_tab.setCurrentWidget(self.signal_properties_tab)
)
self.signals_list.currentItemChanged.connect(self.display_specs)
self.signals_list.itemDoubleClicked.connect(self.set_visible_tab)
self.audio_widget = AudioPlayer(
self.play,
self.pause,
@@ -529,26 +532,56 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.update_forecast_bar.set_idle()
self.forecast_data.update_complete.connect(self.update_forecast)
self.main_tab.currentChanged.connect(self.hide_show_right_widget)
# Final operations.
self.theme_manager.start()
self.load_db()
self.display_signals()
@pyqtSlot()
def hide_show_right_widget(self):
"""Hide or show the waterfall+audio widget based on the current tab."""
if self.main_tab.currentIndex() == MainTabs.FORECAST:
self.fixed_audio_and_image.setVisible(False)
elif not self.fixed_audio_and_image.isVisible():
self.fixed_audio_and_image.setVisible(True)
@pyqtSlot()
def set_visible_tab(self):
"""Set the current main tab when double-clicking a signal name."""
if self.main_tab.currentWidget() != self.signal_properties_tab:
self.main_tab.setCurrentWidget(self.signal_properties_tab)
else:
self.main_tab.setCurrentWidget(self.filter_tab)
@pyqtSlot()
def start_update_forecast(self):
"""Start the update of the 3-day forecast screen.
Start the corresponding thread.
"""
if not self.forecast_data.is_updating:
self.update_forecast_bar.set_updating()
self.forecast_data.update()
@pyqtSlot()
def start_update_space_weather(self):
"""Start the update of the space weather screen.
Start the corresponding thread.
"""
if not self.space_weather_data.is_updating:
self.update_now_bar.set_updating()
self.space_weather_data.update()
@pyqtSlot(bool)
def update_forecast(self, status_ok):
"""Update the 3-day forecast screen after a successful download.
If the download was not successful throw a warning. In any case remove
the downloaded data.
"""
self.update_forecast_bar.set_idle()
if status_ok:
self.forecast_data.update_all_labels()
@@ -559,10 +592,18 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot(bool)
def update_space_weather(self, status_ok):
"""Update the space weather screen after a successful download.
If the download was not successful throw a warning. In any case remove
the downloaded data.
"""
self.update_now_bar.set_idle()
if status_ok:
xray_long = float(self.space_weather_data.xray[-1][7])
format_text = lambda letter, power: letter + f"{xray_long * 10**power:.1f}"
xray_long = safe_cast(self.space_weather_data.xray[-1][7], float)
def format_text(letter, power):
return letter + f"{xray_long * 10**power:.1f}"
if xray_long < 1e-8 and xray_long != -1.00e+05:
self.peak_flux_lbl.setText(format_text("<A", 8))
elif xray_long >= 1e-8 and xray_long < 1e-7:
@@ -593,7 +634,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
elif xray_long == -1.00e+05:
self.switchable_r_labels.switch_off_all()
pro10 = float(self.space_weather_data.prot_el[-1][8])
pro10 = safe_cast(self.space_weather_data.prot_el[-1][8], float)
if pro10 < 10 and pro10 != -1.00e+05:
self.switchable_s_labels.switch_on(self.s0_now_lbl)
elif pro10 >= 10 and pro10 < 100:
@@ -609,9 +650,13 @@ class Artemis(QMainWindow, Ui_MainWindow):
elif pro10 == -1.00e+05:
self.switchable_s_labels.switch_off_all()
k_index = int(self.space_weather_data.ak_index[8][11].replace('.', ''))
k_index = safe_cast(
self.space_weather_data.ak_index[8][11].replace('.', ''), int
)
self.k_index_lbl.setText(str(k_index))
a_index = int(self.space_weather_data.ak_index[7][7].replace('.', ''))
a_index = safe_cast(
self.space_weather_data.ak_index[7][7].replace('.', ''), int
)
self.a_index_lbl.setText(str(a_index))
if k_index == 0:
@@ -670,7 +715,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.a_storm_labels.switch_on(self.a_sev_storm_lbl)
index = self.space_weather_data.geo_storm[6].index("was") + 1
k_index_24_hmax = int(self.space_weather_data.geo_storm[6][index])
k_index_24_hmax = safe_cast(
self.space_weather_data.geo_storm[6][index], int
)
if k_index_24_hmax == 0:
self.switchable_g_today_labels.switch_on(self.g0_today_lbl)
elif k_index_24_hmax == 1:
@@ -692,12 +739,18 @@ class Artemis(QMainWindow, Ui_MainWindow):
elif k_index_24_hmax == 9:
self.switchable_g_today_labels.switch_on(self.g5_today_lbl)
val = int(self.space_weather_data.ak_index[7][2].replace('.', ''))
val = safe_cast(
self.space_weather_data.ak_index[7][2].replace('.', ''), int
)
self.sfi_lbl.setText(f"{val}")
val = int([x[4] for x in self.space_weather_data.sgas if "SSN" in x][0])
val = safe_cast(
[x[4] for x in self.space_weather_data.sgas
if "SSN" in x][0], int
)
self.sn_lbl.setText(f"{val:d}")
for label, pixmap in zip(self.space_weather_labels, self.space_weather_data.images):
for label, pixmap in zip(self.space_weather_labels,
self.space_weather_data.images):
label.pixmap = pixmap
label.make_transparent()
label.apply_pixmap()
@@ -708,6 +761,12 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def go_to_gfd(self, by):
"""Open a browser tab with the GFD site.
Make the search by frequency or location.
Argument:
by -- either GfdType.FREQ or GfdType.LOC.
"""
query = "/?q="
if by is GfdType.FREQ:
value_in_mhz = self.freq_gfd.value() \
@@ -723,23 +782,37 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot(QListWidgetItem)
def remove_if_unselected_modulation(self, item):
"""If an item is unselected from the modulations list, hide the item."""
if not item.isSelected():
self.show_matching_modulations(self.search_bar_modulation.text())
@pyqtSlot(QListWidgetItem)
def remove_if_unselected_location(self, item):
"""If an item is unselected from the locations list, hide the item."""
if not item.isSelected():
self.show_matching_locations(self.search_bar_location.text())
@pyqtSlot(str)
def show_matching_modulations(self, text):
"""Show the modulations which matches 'text'.
The match criterion is defined in 'show_matching_strings'."""
self.show_matching_strings(self.modulation_list, text)
@pyqtSlot(str)
def show_matching_locations(self, text):
"""Show the locations which matches 'text'.
The match criterion is defined in 'show_matching_strings'."""
self.show_matching_strings(self.locations_list, text)
def show_matching_strings(self, list_elements, text):
"""Show all elements of QListWidget that matches (even partially) a target text.
Arguments:
list_elements -- the QListWidget
text -- the target text.
"""
for index in range(list_elements.count()):
item = list_elements.item(index)
if text.lower() in item.text().lower() or item.isSelected():
@@ -748,6 +821,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
item.setHidden(True)
def set_mode_tree_widget(self):
"""Construct the QTreeWidget for the 'Mode' screen."""
for parent, children in Constants.MODES.items():
iparent = QTreeWidgetItem([parent])
self.mode_tree_widget.addTopLevelItem(iparent)
@@ -757,6 +831,10 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.mode_tree_widget.expandAll()
def manage_mode_selections(self):
"""Rules the selection of childs items of the 'Mode' QTreeWidget.
If a parent is selected all its children will be selected as well.
"""
selected_items = self.mode_tree_widget.selectedItems()
parents = Constants.MODES.keys()
for parent in parents:
@@ -766,14 +844,21 @@ class Artemis(QMainWindow, Ui_MainWindow):
item.child(i).setSelected(True)
def set_initial_size(self):
"""Function to handle high resolution screens. The function sets bigger
sizes for all the relevant fixed-size widgets.
Also by default it sets the size to 3/4 of the available space
both vertically and horizontally."""
"""Handle high resolution screens.
Set bigger sizes for all the relevant fixed-size widgets.
Also by default set the size to 3/4 of the available space both
vertically and horizontally.
"""
d = QDesktopWidget().availableGeometry()
center = d.center()
w = d.width()
h = d.height()
self.setGeometry(50, 50, (3 * w) // 4, (3 * h) // 4)
rect = QRect()
rect.setHeight((3 * h) // 4)
rect.setWidth((3 * w) // 4)
rect.moveCenter(center)
self.setGeometry(rect)
if w > 3000 or h > 2000:
self.fixed_audio_and_image.setFixedSize(540, 1150)
self.fixed_audio_and_image.setMaximumSize(540, 1150)
@@ -797,6 +882,10 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.freq_gfd.setFixedWidth(200)
self.unit_freq_gfd.setFixedWidth(120)
self.mode_tree_widget.setMinimumWidth(500)
self.modulation_list.setMinimumWidth(500)
self.locations_list.setMinimumWidth(500)
self.audio_progress.setFixedHeight(20)
self.volume.setStyleSheet("""
QSlider::groove:horizontal {
@@ -816,12 +905,23 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def download_db(self):
"""Start the database download.
Do nothing if already downloading.
"""
if not self.download_window.isVisible():
self.download_window.start_download()
self.download_window.show()
@pyqtSlot()
def ask_if_download(self):
"""Check if the database is at its latest version.
If a new database is available automatically start the download.
If not ask if should download it anyway.
If already downloading do nothing.
Handle possible connection errors.
"""
if not self.download_window.isVisible():
db_path = os.path.join(Constants.DATA_FOLDER, Database.NAME)
try:
@@ -849,6 +949,13 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def check_db_ver(self):
"""Check if the database is at its latest version.
If a new database version is available, ask if it should be downloaded.
If a new database version is not available display a message.
If already downloading do nothing.
Handle possible connection errors.
"""
if not self.download_window.isVisible():
db_path = os.path.join(Constants.DATA_FOLDER, Database.NAME)
answer = None
@@ -882,19 +989,24 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def show_downloaded_signals(self):
"""Load and display the database signal list."""
self.search_bar.setEnabled(True)
self.load_db()
self.display_signals()
def load_db(self):
names = Database.NAMES
"""Load the database from file.
Populate the signals list and set the total number of signals.
Handle possible missing file error.
"""
try:
self.db = read_csv(os.path.join(Constants.DATA_FOLDER, Database.NAME),
sep=Database.DELIMITER,
header=None,
index_col=0,
dtype={name: str for name in Database.STRINGS},
names=names)
names=Database.NAMES)
except FileNotFoundError:
self.search_bar.setDisabled(True)
answer = pop_up(self, title=Messages.NO_DB,
@@ -904,14 +1016,15 @@ class Artemis(QMainWindow, Ui_MainWindow):
if answer == QMessageBox.Yes:
self.download_db()
else:
self.db = self.db.groupby(level=0).first()
self.signal_names = self.db.index
self.total_signals = len(self.signal_names)
self.db.fillna(Constants.UNKNOWN, inplace=True)
self.db[Signal.WIKI_CLICKED] = False
self.update_status_tip(self.total_signals)
self.result_list.clear()
self.result_list.addItems(self.signal_names)
self.result_list.setCurrentItem(None)
self.signals_list.clear()
self.signals_list.addItems(self.signal_names)
self.signals_list.setCurrentItem(None)
self.modulation_list.addItems(
self.collect_list(
Signal.MODULATION
@@ -923,7 +1036,13 @@ class Artemis(QMainWindow, Ui_MainWindow):
)
)
def collect_list(self, list_property, separator=';'):
def collect_list(self, list_property, separator=Constants.FIELD_SEPARATOR):
"""Collect all the entrys of a QListWidget.
Handle multiple entries in one item seprated by a separator.
Keyword argument:
separator -- the separator character for multiple-entries items.
"""
values = self.db[list_property]
values = list(
set([
@@ -940,6 +1059,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
lower_spin_box,
upper_combo_box,
upper_spin_box):
"""Forbid to a lower limit to be greater than the corresponding upper one.
Used for frequency and bandwidth screens."""
if lower_spin_box.isEnabled():
unit_conversion = {'Hz': ['kHz', 'MHz', 'GHz'],
'kHz': ['MHz', 'GHz'],
@@ -980,6 +1102,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
upper_unit,
upper_confidence,
range_lbl):
"""Display the actual range applied for the signal's property search.
Used for frequency and bandwidth screens."""
activate_low = False
activate_high = False
color = self.inactive_color
@@ -1021,6 +1146,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def set_acf_interval_label(self):
"""Display the actual acf interval for the search."""
tolerance = self.acf_spinbox.value() * self.acf_confidence.value() / 100
if tolerance > 0:
val = round(self.acf_spinbox.value() - tolerance, Constants.MAX_DIGITS)
@@ -1033,12 +1159,17 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def activate_if_toggled(self, radio_btn, *widgets):
"""If radio_btn is toggled, activate all *widgets.
Do nothing otherwise.
"""
toggled = radio_btn.isChecked()
for w in widgets[:-1]: # Neglect the bool coming from the emitted signal.
w.setEnabled(toggled)
@pyqtSlot()
def display_signals(self):
"""Display all the signal names which matches the applied filters."""
text = self.search_bar.text()
available_signals = 0
for index, signal_name in enumerate(self.signal_names):
@@ -1050,15 +1181,16 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.modulation_filters_ok(signal_name),
self.location_filters_ok(signal_name),
self.acf_filters_ok(signal_name)]):
self.result_list.item(index).setHidden(False)
self.signals_list.item(index).setHidden(False)
available_signals += 1
else:
self.result_list.item(index).setHidden(True)
self.signals_list.item(index).setHidden(True)
# Remove selected item.
self.result_list.setCurrentItem(None)
self.signals_list.setCurrentItem(None)
self.update_status_tip(available_signals)
def update_status_tip(self, available_signals):
"""Display the number of displayed signals in the status tip."""
if available_signals < self.total_signals:
self.statusbar.setStyleSheet(f'color: {self.active_color}')
else:
@@ -1069,6 +1201,10 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_fb_filters(self, ftype):
"""Reset the Frequency or Bandwidth depending on 'ftype'.
ftype can be either Ftype.FREQ or Ftype.BAND.
"""
if ftype != Ftype.FREQ and ftype != Ftype.BAND:
raise ValueError("Wrong ftype in function 'reset_fb_filters'")
@@ -1103,6 +1239,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_cat_filters(self):
"""Reset the category filter screen."""
uncheck_and_emit(self.apply_remove_cat_filter_btn)
for f in self.cat_filter_btns:
if f.isChecked():
@@ -1111,6 +1248,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_mode_filters(self):
"""Reset the mode filter screen."""
uncheck_and_emit(self.apply_remove_mode_filter_btn)
parents = Constants.MODES.keys()
selected_children = []
@@ -1126,6 +1264,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_modulation_filters(self):
"""Reset the modulation filter screen."""
uncheck_and_emit(self.apply_remove_modulation_filter_btn)
self.search_bar_modulation.setText('')
self.show_matching_strings(
@@ -1138,6 +1277,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_location_filters(self):
"""Reset the location filter screen."""
uncheck_and_emit(self.apply_remove_location_filter_btn)
self.search_bar_location.setText('')
self.show_matching_strings(
@@ -1150,6 +1290,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_acf_filters(self):
"""Reset the acf filter screen."""
uncheck_and_emit(self.apply_remove_acf_filter_btn)
if self.include_undef_acf.isChecked():
self.include_undef_acf.setChecked(False)
@@ -1157,6 +1298,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.acf_confidence.setValue(0)
def frequency_filters_ok(self, signal_name):
"""Evalaute if the signal matches the frequency filters."""
if not self.apply_remove_freq_filter_btn.isChecked():
return True
undef_freq = is_undef_freq(self.db.loc[signal_name])
@@ -1167,8 +1309,8 @@ class Artemis(QMainWindow, Ui_MainWindow):
return False
signal_freqs = (
int(self.db.at[signal_name, Signal.INF_FREQ]),
int(self.db.at[signal_name, Signal.SUP_FREQ])
safe_cast(self.db.at[signal_name, Signal.INF_FREQ], int),
safe_cast(self.db.at[signal_name, Signal.SUP_FREQ], int)
)
band_filter_ok = False
@@ -1196,6 +1338,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
return lower_limit_ok and upper_limit_ok
def band_filters_ok(self, signal_name):
"""Evalaute if the signal matches the band filters."""
if not self.apply_remove_band_filter_btn.isChecked():
return True
undef_band = is_undef_band(self.db.loc[signal_name])
@@ -1206,8 +1349,8 @@ class Artemis(QMainWindow, Ui_MainWindow):
return False
signal_bands = (
int(self.db.at[signal_name, Signal.INF_BAND]),
int(self.db.at[signal_name, Signal.SUP_BAND])
safe_cast(self.db.at[signal_name, Signal.INF_BAND], int),
safe_cast(self.db.at[signal_name, Signal.SUP_BAND], int)
)
lower_limit_ok = True
@@ -1225,6 +1368,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
return lower_limit_ok and upper_limit_ok
def category_filters_ok(self, signal_name):
"""Evalaute if the signal matches the category filters."""
if not self.apply_remove_cat_filter_btn.isChecked():
return True
cat_code = self.db.at[signal_name, Signal.CATEGORY_CODE]
@@ -1241,6 +1385,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
return cat_checked == positive_cases and cat_checked > 0
def mode_filters_ok(self, signal_name):
"""Evalaute if the signal matches the mode filters."""
if not self.apply_remove_mode_filter_btn.isChecked():
return True
signal_mode = self.db.at[signal_name, Signal.MODE]
@@ -1263,27 +1408,40 @@ class Artemis(QMainWindow, Ui_MainWindow):
ok.append(item.text(0) == signal_mode)
return any(ok)
def get_field_entries(self, signal_name, field, separator=Constants.FIELD_SEPARATOR):
"""Take a signal name, a column label and optionally a separator string.
Return a list obtained by splitting the signal field with separator."""
return [
x.strip() for x in self.db.at[signal_name, field].split(separator)
]
def modulation_filters_ok(self, signal_name):
"""Evalaute if the signal matches the modulation filters."""
if not self.apply_remove_modulation_filter_btn.isChecked():
return True
signal_modulation = [
x.strip() for x in self.db.at[signal_name, Signal.MODULATION].split(',')
]
signal_modulation = self.get_field_entries(
signal_name, Signal.MODULATION
)
for item in self.modulation_list.selectedItems():
if item.text() in signal_modulation:
return True
return False
def location_filters_ok(self, signal_name):
"""Evalaute if the signal matches the location filters."""
if not self.apply_remove_location_filter_btn.isChecked():
return True
signal_location = self.db.at[signal_name, Signal.LOCATION]
signal_locations = self.get_field_entries(
signal_name, Signal.LOCATION
)
for item in self.locations_list.selectedItems():
if item.text() == signal_location:
if item.text() in signal_locations:
return True
return False
def acf_filters_ok(self, signal_name):
"""Evalaute if the signal matches the acf filters."""
if not self.apply_remove_acf_filter_btn.isChecked():
return True
signal_acf = self.db.at[signal_name, Signal.ACF]
@@ -1293,7 +1451,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
else:
return False
else:
signal_acf = float(signal_acf.rstrip("ms"))
signal_acf = safe_cast(signal_acf.rstrip("ms"), float)
tolerance = self.acf_spinbox.value() * self.acf_confidence.value() / 100
upper_limit = self.acf_spinbox.value() + tolerance
lower_limit = self.acf_spinbox.value() - tolerance
@@ -1304,6 +1462,11 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot(QListWidgetItem, QListWidgetItem)
def display_specs(self, item, previous_item):
"""Display the signal properties.
'item' is the item corresponding to the selected signal
'previous_item' is unused.
"""
self.display_spectrogram()
if item is not None:
self.current_signal_name = item.text()
@@ -1369,11 +1532,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.audio_widget.set_audio_player()
def display_spectrogram(self):
default_pic = os.path.join(
Constants.DEFAULT_IMGS_FOLDER,
Constants.NOT_SELECTED
)
item = self.result_list.currentItem()
"""Display the selected signal's waterfall."""
default_pic = Constants.DEFAULT_NOT_SELECTED
item = self.signals_list.currentItem()
if item:
spectrogram_name = item.text()
path_spectr = os.path.join(
@@ -1382,23 +1543,32 @@ class Artemis(QMainWindow, Ui_MainWindow):
spectrogram_name + Constants.SPECTRA_EXT
)
if not QFileInfo(path_spectr).exists():
path_spectr = os.path.join(
Constants.DEFAULT_IMGS_FOLDER,
Constants.NOT_AVAILABLE
)
path_spectr = Constants.DEFAULT_NOT_AVAILABLE
else:
path_spectr = default_pic
self.spectrogram.setPixmap(QPixmap(path_spectr))
def activate_band_category(self, band_label, activate=True):
"""Highlight the given band_label.
If activate is False remove the highlight (default to True).
"""
color = self.active_color if activate else self.inactive_color
for label in band_label:
label.setStyleSheet(f"color: {color};")
def set_band_range(self, current_signal=None):
"""Highlight the signal's band labels.
If no signal is selected remove all highlights.
"""
if current_signal is not None and not is_undef_freq(current_signal):
lower_freq = int(current_signal.at[Signal.INF_FREQ])
upper_freq = int(current_signal.at[Signal.SUP_FREQ])
lower_freq = safe_cast(
current_signal.at[Signal.INF_FREQ], int
)
upper_freq = safe_cast(
current_signal.at[Signal.SUP_FREQ], int
)
zipped = list(zip(Constants.BANDS, self.band_labels))
for i, w in enumerate(zipped):
band, band_label = w
@@ -1418,6 +1588,10 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def reset_all_filters(self):
"""Reset all filter screens.
Show all available signals.
"""
self.reset_frequency_filters_btn.clicked.emit()
self.reset_band_filters_btn.clicked.emit()
self.reset_cat_filters_btn.clicked.emit()
@@ -1428,6 +1602,10 @@ class Artemis(QMainWindow, Ui_MainWindow):
@pyqtSlot()
def go_to_web_page_signal(self):
"""Go the web page of the signal's wiki.
Do nothing if no signal is selected.
"""
if self.current_signal_name:
self.url_button.setStyleSheet(
f"color: {self.url_button.colors.clicked}"
@@ -1436,6 +1614,9 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.db.at[self.current_signal_name, Signal.WIKI_CLICKED] = True
def closeEvent(self, event):
"""Extends closeEvent of QMainWindow.
Shutdown all active threads and close all open windows."""
self.closing = True
if self.download_window.isVisible():
self.download_window.close()
@@ -1448,11 +1629,12 @@ class Artemis(QMainWindow, Ui_MainWindow):
if __name__ == '__main__':
my_app = QApplication(sys.argv)
img = QPixmap(":/icon/default_pics/Artemis3.500px.png")
ARTEMIS_ICON = os.path.join(":", "icon", "default_pics", "Artemis3.500px.png")
img = QPixmap(ARTEMIS_ICON)
splash = QSplashScreen(img)
splash.show()
start = time()
while time() - start < 2:
while time() - start < 1.5:
sleep(0.001)
my_app.processEvents()
splash.close()

View File

@@ -21,7 +21,7 @@
</property>
<property name="windowIcon">
<iconset resource="default_imgs.qrc">
<normaloff>:/icon/default_pics/Artemis3.ico</normaloff>:/icon/default_pics/Artemis3.ico</iconset>
<normaloff>:/icon/default_pics/Artemis3.500px.png</normaloff>:/icon/default_pics/Artemis3.500px.png</iconset>
</property>
<property name="styleSheet">
<string notr="true"/>
@@ -119,7 +119,7 @@
</widget>
</item>
<item row="1" column="0" colspan="2">
<widget class="QListWidget" name="result_list">
<widget class="QListWidget" name="signals_list">
<property name="sizePolicy">
<sizepolicy hsizetype="Preferred" vsizetype="Expanding">
<horstretch>0</horstretch>
@@ -187,7 +187,7 @@
<number>0</number>
</property>
<property name="movable">
<bool>true</bool>
<bool>false</bool>
</property>
<widget class="QWidget" name="signal_properties_tab">
<property name="minimumSize">

View File

@@ -1,5 +1,4 @@
import os
from pydub import AudioSegment
from pygame import mixer
from PyQt5.QtCore import QTimer, pyqtSlot, QObject
@@ -7,123 +6,139 @@ from constants import Constants
import qtawesome as qta
class AudioPlayer(QObject): # Maybe useless inheriting from QObject
"""This is the audio player widget. The only public methods are the __init__
class AudioPlayer(QObject):
"""Subclass QObject. Audio player widget for the audio samples.
The only public methods are the __init__
method, set_audio_player, which loads the current file and refresh_btns_colors.
Everything else is managed internally."""
__time_step = 500 # Milliseconds.
_TIME_STEP = 500 # Milliseconds.
def __init__(self, play, pause, stop, volume, audio_progress, active_color, inactive_color):
def __init__(self, play,
pause,
stop,
volume,
audio_progress,
active_color,
inactive_color):
"""Initialize the player."""
super().__init__()
self.__paused = False
self.__first_call = True
self.__play = play
self.__pause = pause
self.__stop = stop
self.__volume = volume
self.__audio_progress = audio_progress
self.__audio_file = None
self.__timer = QTimer()
self.__timer.timeout.connect(self.__update_bar)
self.__play.clicked.connect(self.__play_audio)
self.__pause.clicked.connect(self.__pause_audio)
self.__stop.clicked.connect(self.__stop_audio)
self.__volume.valueChanged.connect(self.__set_volume)
self.__play.setIconSize(self.__play.size())
self.__pause.setIconSize(self.__pause.size())
self.__stop.setIconSize(self.__stop.size())
self._paused = False
self._first_call = True
self._play = play
self._pause = pause
self._stop = stop
self._volume = volume
self._audio_progress = audio_progress
self._audio_file = None
self._timer = QTimer()
self._timer.timeout.connect(self._update_bar)
self._play.clicked.connect(self._play_audio)
self._pause.clicked.connect(self._pause_audio)
self._stop.clicked.connect(self._stop_audio)
self._volume.valueChanged.connect(self._set_volume)
self._play.setIconSize(self._play.size())
self._pause.setIconSize(self._pause.size())
self._stop.setIconSize(self._stop.size())
self.refresh_btns_colors(active_color, inactive_color)
def refresh_btns_colors(self, active_color, inactive_color):
self.__play.setIcon(qta.icon('fa5.play-circle',
"""Repaint the buttons of the widgetd after the theme has changed."""
self._play.setIcon(qta.icon('fa5.play-circle',
color=active_color,
color_disabled=inactive_color))
self.__pause.setIcon(qta.icon('fa5.pause-circle',
self._pause.setIcon(qta.icon('fa5.pause-circle',
color=active_color,
color_disabled=inactive_color))
self.__stop.setIcon(qta.icon('fa5.stop-circle',
self._stop.setIcon(qta.icon('fa5.stop-circle',
color=active_color,
color_disabled=inactive_color))
@pyqtSlot()
def __set_volume(self):
def _set_volume(self):
"""Set the volume of the audio samples."""
if mixer.get_init():
mixer.music.set_volume(
self.__volume.value() / self.__volume.maximum()
self._volume.value() / self._volume.maximum()
)
def __reset_audio_widget(self):
def _reset_audio_widget(self):
"""Reset the widget. Stop all playing samples."""
if mixer.get_init():
if mixer.music.get_busy():
mixer.music.stop()
self.__timer.stop()
self._timer.stop()
mixer.quit()
self.__audio_progress.reset()
self.__enable_buttons(False, False, False)
self.__paused = False
self._audio_progress.reset()
self._enable_buttons(False, False, False)
self._paused = False
@pyqtSlot()
def __update_bar(self):
def _update_bar(self):
"""Update the progress bar."""
pos = mixer.music.get_pos()
if pos == -1:
self.__timer.stop()
self.__audio_progress.reset()
self.__enable_buttons(True, False, False)
self._timer.stop()
self._audio_progress.reset()
self._enable_buttons(True, False, False)
else:
self.__audio_progress.setValue(pos)
self._audio_progress.setValue(pos)
def __set_max_progress_bar(self):
self.__audio_progress.setMaximum(
mixer.Sound(self.__audio_file).get_length() * 1000
def _set_max_progress_bar(self):
"""Set the maximum value of the progress bar."""
self._audio_progress.setMaximum(
mixer.Sound(self._audio_file).get_length() * 1000
)
def set_audio_player(self, fname=""):
self.__first_call = True
self.__reset_audio_widget()
"""Set the current audio sample."""
self._first_call = True
self._reset_audio_widget()
full_name = os.path.join(
Constants.DATA_FOLDER,
Constants.AUDIO_FOLDER,
fname + '.ogg'
)
if os.path.exists(full_name):
self.__play.setEnabled(True)
self.__audio_file = full_name
self._play.setEnabled(True)
self._audio_file = full_name
@pyqtSlot()
def __play_audio(self):
if not self.__paused:
if self.__first_call:
self.__first_call = False
mixer.init(frequency=AudioSegment.from_ogg(
self.__audio_file
).frame_rate,
buffer=2048)
mixer.music.load(self.__audio_file)
self.__set_volume()
self.__set_max_progress_bar()
def _play_audio(self):
"""Play the audio sample."""
if not self._paused:
if self._first_call:
self._first_call = False
mixer.init(48000, -16, 1, 1024)
mixer.music.load(self._audio_file)
self._set_volume()
self._set_max_progress_bar()
mixer.music.play()
else:
mixer.music.unpause()
self.__paused = False
self.__timer.start(self.__time_step)
self.__enable_buttons(False, True, True)
self._paused = False
self._timer.start(self._TIME_STEP)
self._enable_buttons(False, True, True)
@pyqtSlot()
def __stop_audio(self):
def _stop_audio(self):
"""Stop the audio sample."""
mixer.music.stop()
self.__audio_progress.reset()
self.__timer.stop()
self.__enable_buttons(True, False, False)
self._audio_progress.reset()
self._timer.stop()
self._enable_buttons(True, False, False)
@pyqtSlot()
def __pause_audio(self):
def _pause_audio(self):
"""Pause the audio sample."""
mixer.music.pause()
self.__timer.stop()
self.__paused = True
self.__enable_buttons(True, False, False)
self._timer.stop()
self._paused = True
self._enable_buttons(True, False, False)
def __enable_buttons(self, play_en, pause_en, stop_en):
self.__play.setEnabled(play_en)
self.__pause.setEnabled(pause_en)
self.__stop.setEnabled(stop_en)
def _enable_buttons(self, play_en, pause_en, stop_en):
"""Set the three buttons status."""
self._play.setEnabled(play_en)
self._pause.setEnabled(pause_en)
self._stop.setEnabled(stop_en)

View File

@@ -1,75 +0,0 @@
import os
from os import listdir
from os.path import isfile, join
mypath='../..'
excluded=['.gitignore','requirements_win.txt','artemis.py']
data_files = [f for f in listdir(mypath) if isfile(join(mypath, f))]
for i in excluded:
data_files.remove(i)
datas=["('../../" + i + "', '.')" for i in data_files]
pyinst_head='''
# -*- mode: python -*-
block_cipher = None
a = Analysis(['../../artemis.py'],
pathex=['../../'],
binaries=[],
datas=[
'''
pyinst_tail='''
hiddenimports=[],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='Artemis',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
runtime_tmpdir=None,
console=True)
'''
setup_file = open('./setup.spec','w')
setup_file.write(pyinst_head + ','.join(datas) + "]," + pyinst_tail)
setup_file.close()
os.system("pyinstaller --onefile setup.spec")
os.system("cp -r ../../themes dist")
os.system("rm -rf build")
desktop = open('./artemis.desktop','w')
desktop.write("""#!/usr/bin/env xdg-open
[Desktop Entry]
Name=Artemis
StartupWMClass=artemis3
Exec=. /SETUP_PATH/Artemis
Terminal=False
Icon=artemis3
Type=Application""")
desktop.close()
print("""To finalize the installation (add Artemis in the main menu):\n
1)\tEdit artemis.desktop file properly and move it to '/.local/share/applications'
2)\tMove the icon file artemis3.svg to '/usr/share/icons/'
""")

View File

@@ -4,30 +4,31 @@ from constants import Constants
class ClickableProgressBar(QProgressBar):
"""Subclass QProgressBar. Clickable progress bar class."""
clicked = pyqtSignal()
def __init__(self, parent=None):
self.__text = ''
"""Initialize the instance."""
self._text = ''
super().__init__(parent)
# def __set_text(self, text):
# self.__text = text
def text(self):
return self.__text
"""Return the text displayed on the bar."""
return self._text
def set_idle(self):
# self.__set_text(Constants.CLICK_TO_UPDATE_STR)
self.__text = Constants.CLICK_TO_UPDATE_STR
"""Set the bar to a non-downloading status."""
self._text = Constants.CLICK_TO_UPDATE_STR
self.setMaximum(self.minimum() + 1)
def set_updating(self):
# self.__set_text(Constants.UPDATING_STR)
self.__text = Constants.UPDATING_STR
"""Set the bar to a downloading status."""
self._text = Constants.UPDATING_STR
self.setMaximum(self.minimum())
def mousePressEvent(self, event):
"""Override QWidget.mousePressEvent. Detect a click on the bar."""
if event.button() == Qt.LeftButton:
self.clicked.emit()
else:

View File

@@ -1,24 +1,45 @@
from collections import namedtuple
from enum import Enum, auto
import os.path
from enum import IntEnum
class MainTabs(IntEnum):
"""The main tabs indeces."""
SIGNAL = 0
FILTERS = 1
GFD = 2
FORECAST = 3
class Ftype:
"""Container class to differentiate between frequency and band.
Used in reset_fb_filters.
"""
FREQ = "freq"
BAND = "band"
class GfdType(Enum):
"""Enum class to differentiate the possible GFD search criterias."""
FREQ = auto()
LOC = auto()
class ChecksumWhat(Enum):
"""Enum class to distinguish the object you want to verify the checksum."""
FOLDER = auto()
DB = auto()
class Messages:
"""Container class for messages to be displayed."""
DB_UP_TO_DATE = "Already up to date"
DB_UP_TO_DATE_MSG = "No newer version to download."
DB_NEW_VER = "New version available"
@@ -34,6 +55,8 @@ class Messages:
class Signal:
"""Container class for the signal property names."""
NAME = "name"
INF_FREQ = "inf_freq"
SUP_FREQ = "sup_freq"
@@ -50,6 +73,8 @@ class Signal:
class Database:
"""Container class for the database-related constants."""
LINK_LOC = "https://aresvalley.com/Storage/Artemis/Database/data.zip"
LINK_REF = "https://aresvalley.com/Storage/Artemis/Database/data.zip.log"
NAME = "db.csv"
@@ -64,17 +89,19 @@ class Database:
Signal.DESCRIPTION,
Signal.MODULATION,
Signal.CATEGORY_CODE,
Signal.ACF,)
Signal.ACF)
DELIMITER = "*"
STRINGS = (Signal.INF_FREQ,
Signal.SUP_FREQ,
Signal.MODE,
Signal.INF_BAND,
Signal.SUP_BAND,
Signal.CATEGORY_CODE,)
Signal.CATEGORY_CODE)
class ForecastColors:
"""Container class for the forecast labels colors."""
WARNING_COLOR = "#F95423"
KP9_COLOR = "#FFCCCB"
KP8_COLOR = "#FFCC9A"
@@ -83,7 +110,12 @@ class ForecastColors:
KP5_COLOR = "#BEE3FE"
_Band = namedtuple("Band", ["lower", "upper"])
class Constants:
"""Container class for several constants of the software."""
CLICK_TO_UPDATE_STR = "Click to update"
SIGIDWIKI = "https://www.sigidwiki.com/wiki/Signal_Identification_Guide"
ADD_SIGNAL_LINK = "https://www.sigidwiki.com/index.php/Special:FormEdit/Signal/?preload=Signal_Identification_Wiki:Signal_form_preload_text"
@@ -119,21 +151,18 @@ class Constants:
LABEL_ON_COLOR = "on"
LABEL_OFF_COLOR = "off"
TEXT_COLOR = "text"
NOT_AVAILABLE = "spectrumnotavailable.png"
NOT_SELECTED = "nosignalselected.png"
__Band = namedtuple("Band", ["lower", "upper"])
__ELF = __Band(0, 30) # Formally it is (3, 30) Hz.
__SLF = __Band(30, 300)
__ULF = __Band(300, 3000)
__VLF = __Band(3000, 30000)
__LF = __Band(30 * 10**3, 300 * 10**3)
__MF = __Band(300 * 10 ** 3, 3000 * 10**3)
__HF = __Band(3 * 10**6, 30 * 10**6)
__VHF = __Band(30 * 10**6, 300 * 10**6)
__UHF = __Band(300 * 10**6, 3000 * 10**6)
__SHF = __Band(3 * 10**9, 30 * 10**9)
__EHF = __Band(30 * 10**9, 300 * 10**9)
BANDS = (__ELF, __SLF, __ULF, __VLF, __LF, __MF, __HF, __VHF, __UHF, __SHF, __EHF)
_ELF = _Band(0, 30) # Formally it is (3, 30) Hz.
_SLF = _Band(30, 300)
_ULF = _Band(300, 3000)
_VLF = _Band(3000, 30000)
_LF = _Band(30 * 10**3, 300 * 10**3)
_MF = _Band(300 * 10 ** 3, 3000 * 10**3)
_HF = _Band(3 * 10**6, 30 * 10**6)
_VHF = _Band(30 * 10**6, 300 * 10**6)
_UHF = _Band(300 * 10**6, 3000 * 10**6)
_SHF = _Band(3 * 10**9, 30 * 10**9)
_EHF = _Band(30 * 10**9, 300 * 10**9)
BANDS = (_ELF, _SLF, _ULF, _VLF, _LF, _MF, _HF, _VHF, _UHF, _SHF, _EHF)
MAX_DIGITS = 3
RANGE_SEPARATOR = ' ÷ '
GFD_SITE = "http://qrg.globaltuners.com/"
@@ -149,10 +178,15 @@ class Constants:
"Chirp Spread Spectrum": (),
"FHSS-TDM": (),
"RAW": (),
"SC-FDMA": (),}
"SC-FDMA": ()}
APPLY = "Apply"
REMOVE = "Remove"
UNKNOWN = "N/A"
EXTRACTING_MSG = "Extracting..."
EXTRACTING_CODE = -1
NOT_AVAILABLE = "spectrumnotavailable.png"
NOT_SELECTED = "nosignalselected.png"
FIELD_SEPARATOR = ";"
DEFAULT_IMGS_FOLDER = os.path.join(":", "pics", "default_pics")
DEFAULT_NOT_SELECTED = os.path.join(DEFAULT_IMGS_FOLDER, NOT_SELECTED)
DEFAULT_NOT_AVAILABLE = os.path.join(DEFAULT_IMGS_FOLDER, NOT_AVAILABLE)

View File

@@ -1,8 +1,6 @@
<RCC>
<qresource prefix="icon">
<file>default_pics/Artemis3.500px.png</file>
<file>default_pics/Artemis3.ico</file>
<file>default_pics/Artemis3.png</file>
</qresource>
<qresource prefix="pics">
<file>default_pics/nosignalselected.png</file>

File diff suppressed because it is too large Load Diff

View File

Before

Width:  |  Height:  |  Size: 36 KiB

After

Width:  |  Height:  |  Size: 36 KiB

BIN
deploy/Windows/artemis3.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 126 KiB

View File

@@ -0,0 +1,56 @@
@echo off
REM Check and gain admin permissions
IF "%PROCESSOR_ARCHITECTURE%" EQU "amd64" (
>nul 2>&1 "%SYSTEMROOT%\SysWOW64\cacls.exe" "%SYSTEMROOT%\SysWOW64\config\system"
) ELSE (
>nul 2>&1 "%SYSTEMROOT%\system32\cacls.exe" "%SYSTEMROOT%\system32\config\system"
)
if '%errorlevel%' NEQ '0' (
echo Requesting administrative privileges...
goto UACPrompt
) else ( goto gotAdmin )
:UACPrompt
echo Set UAC = CreateObject^("Shell.Application"^) > "%temp%\getadmin.vbs"
set params= %*
echo UAC.ShellExecute "cmd.exe", "/c ""%~s0"" %params:"=""%", "", "runas", 1 >> "%temp%\getadmin.vbs"
"%temp%\getadmin.vbs"
del "%temp%\getadmin.vbs"
exit /B
:gotAdmin
pushd "%CD%"
CD /D "%~dp0"
REM Set the correct permissions for Artemis folder
set artemis_path=%~dp0..\..
icacls "%artemis_path%" /grant %USERNAME%:(OI)(CI)F /T > nul
REM Download necessary libraries with pip3
set choice=Y
set /p choice=Download necessary Python libraries? [Y]/N
if /I '%choice%'=='Y' pip3 install -r %~dp0requirements_win.txt
echo:
REM Create a shortcut
set choice=Y
set /p choice=Create a desktop shortcut? [Y]/N
if /I '%choice%'=='N' goto end
ren "%artemis_path%\Artemis.py" "Artemis.pyw"
echo Set oWS = WScript.CreateObject("WScript.Shell") > CreateShortcut.vbs
echo sLinkFile = "%USERPROFILE%\Desktop\Artemis.lnk" >> CreateShortcut.vbs
echo Set oLink = oWS.CreateShortcut(sLinkFile) >> CreateShortcut.vbs
echo oLink.TargetPath = "%artemis_path%\Artemis.pyw" >> CreateShortcut.vbs
echo oLink.WorkingDirectory = "%artemis_path%" >> CreateShortcut.vbs
echo oLink.IconLocation = "%~dp0artemis3.ico" >> CreateShortcut.vbs
echo oLink.Save >> CreateShortcut.vbs
cscript /nologo CreateShortcut.vbs
del CreateShortcut.vbs
:end
echo:
echo SETTING COMPLETE!
echo:
pause

View File

@@ -1,38 +1,55 @@
from PyQt5.QtWidgets import QPushButton
from PyQt5.QtCore import pyqtSlot
class DoubleTextButton(QPushButton):
"""Subclass QPushButton.
A click will deactivate/activate a series of 'slave' widgets depending
on the 'checked' status of the button."""
def __init__(self, parent=None):
"""Extends QPushButton.__init__."""
super().__init__(parent)
self.clicked.connect(self.__manage_click)
self.clicked.connect(self._manage_click)
def set_texts(self, text_a, text_b):
self.__text_a = text_a
self.__text_b = text_b
"""Set the two texts to be displayed."""
self._text_a = text_a
self._text_b = text_b
def set_slave_filters(self, simple_ones=None,
radio_1=None,
ruled_by_radio_1=None,
radio_2=None,
ruled_by_radio_2=None):
self.__simple_ones = simple_ones
self.__ruled_by_radio_1 = ruled_by_radio_1
self.__radio_1 = radio_1
self.__ruled_by_radio_2 = ruled_by_radio_2
self.__radio_2 = radio_2
"""Set all the 'slave' widgets.
Keyword arguments:
simple_ones -- a list of widgets.
radio_1 -- a radio button.
ruled_by_radio_1 -- a list of widgets whose status depend upon radio_1.
radio_2 -- a radio button.
ruled_by_radio_2 -- a list of widgets whose status depend upon radio_2."""
self._simple_ones = simple_ones
self._ruled_by_radio_1 = ruled_by_radio_1
self._radio_1 = radio_1
self._ruled_by_radio_2 = ruled_by_radio_2
self._radio_2 = radio_2
@pyqtSlot()
def __manage_click(self):
def _manage_click(self):
"""Set the status of all the 'slave widgets' based on the status of the instance."""
if self.isChecked():
self.setText(self.__text_b)
self.setText(self._text_b)
enable = False
else:
self.setText(self.__text_a)
self.setText(self._text_a)
enable = True
for f in self.__simple_ones:
for f in self._simple_ones:
f.setEnabled(enable)
radio_btns = self.__radio_1, self.__radio_2
ruled_widgets = self.__ruled_by_radio_1, self.__ruled_by_radio_2
radio_btns = self._radio_1, self._radio_2
ruled_widgets = self._ruled_by_radio_1, self._ruled_by_radio_2
for radio_btn, ruled_by in zip(radio_btns, ruled_widgets):
if ruled_by:
for f in ruled_by:

View File

@@ -15,7 +15,7 @@
</property>
<property name="windowIcon">
<iconset resource="default_imgs.qrc">
<normaloff>:/icon/default_pics/Artemis3.ico</normaloff>:/icon/default_pics/Artemis3.ico</iconset>
<normaloff>:/icon/default_pics/Artemis3.500px.png</normaloff>:/icon/default_pics/Artemis3.500px.png</iconset>
</property>
<property name="styleSheet">
<string notr="true"/>

View File

@@ -5,75 +5,91 @@ from threads import DownloadThread, ThreadStatus
from utilities import pop_up, resource_path
from constants import Constants, Messages
Ui_Download_window, _ = uic.loadUiType(resource_path("download_db_window.ui"))
Ui_Download_window, _ = uic.loadUiType(
resource_path("download_db_window.ui")
)
class DownloadWindow(QWidget, Ui_Download_window):
"""Subclass QWidget and Ui_Download_window. It is the window displayed during the database download."""
complete = pyqtSignal()
closed = pyqtSignal()
def __init__(self):
"""Initialize the window."""
super().__init__()
self.setupUi(self)
self.setWindowFlags(
# Qt.Window |
Qt.CustomizeWindowHint |
Qt.WindowTitleHint |
Qt.WindowCloseButtonHint #|
# Qt.WindowStaysOnTopHint
Qt.WindowCloseButtonHint |
Qt.WindowStaysOnTopHint
)
self.__no_internet_msg = pop_up(self, title=Messages.NO_CONNECTION,
self._no_internet_msg = pop_up(self, title=Messages.NO_CONNECTION,
text=Messages.NO_CONNECTION_MSG,
connection=self.close)
self.__bad_db_download_msg = pop_up(self, title=Messages.BAD_DOWNLOAD,
self._bad_db_download_msg = pop_up(self, title=Messages.BAD_DOWNLOAD,
text=Messages.BAD_DOWNLOAD_MSG,
connection=self.close)
self.__download_thread = DownloadThread()
self.__download_thread.finished.connect(self.__wait_close)
self.__download_thread.progress.connect(self.__display_progress)
self.cancel_btn.clicked.connect(self.__terminate_process)
self._download_thread = DownloadThread()
self._download_thread.finished.connect(self._wait_close)
self._download_thread.progress.connect(self._display_progress)
self.closed.connect(self._download_thread.set_exit)
self.cancel_btn.clicked.connect(self._terminate_process)
def start_download(self):
self.__download_thread.start()
"""Start the download thread."""
self._download_thread.start()
def __downlaod_format_str(self, n, speed):
return f"Downloaded MB: {n}\nSpeed: {speed} MB/s"
def _downlaod_format_str(self, n, speed):
"""Return a well-formatted string with downloaded MB and speed."""
return f"Downloaded: {n} MB\nSpeed: {speed} MB/s"
def show(self):
self.status_lbl.setText(self.__downlaod_format_str(0, 0))
"""Extends QWidget.show. Set downloaded MB and speed to zero."""
self.status_lbl.setText(self._downlaod_format_str(0, 0))
super().show()
@pyqtSlot(int, float)
def __display_progress(self, progress, speed):
def _display_progress(self, progress, speed):
"""Display the downloaded MB and speed."""
if progress != Constants.EXTRACTING_CODE:
self.status_lbl.setText(self.__downlaod_format_str(progress, speed))
self.status_lbl.setText(self._downlaod_format_str(progress, speed))
elif progress == Constants.EXTRACTING_CODE:
self.status_lbl.setText(Constants.EXTRACTING_MSG + '\n')
def _stop_thread(self):
"""Ask the download thread to stop."""
if self._download_thread.isRunning():
self.closed.emit()
self._download_thread.wait()
@pyqtSlot()
def __terminate_process(self):
if self.__download_thread.isRunning():
self.__download_thread.terminate()
self.__download_thread.wait()
def _terminate_process(self):
"""Terminate the download thread and close."""
self._stop_thread()
self.close()
@pyqtSlot()
def __wait_close(self):
if self.__download_thread.status is ThreadStatus.OK:
def _wait_close(self):
"""Decide the action based on the download thread status and close."""
if self._download_thread.status is ThreadStatus.OK:
self.complete.emit()
self.close()
elif self.__download_thread.status is ThreadStatus.NO_CONNECTION_ERR:
self.__no_internet_msg.show()
elif self.__download_thread.status is ThreadStatus.BAD_DOWNLOAD_ERR:
self.__bad_db_download_msg.show()
elif self._download_thread.status is ThreadStatus.NO_CONNECTION_ERR:
self._no_internet_msg.show()
elif self._download_thread.status is ThreadStatus.BAD_DOWNLOAD_ERR:
self._bad_db_download_msg.show()
else:
self.close()
def reject(self):
if self.__download_thread.isRunning():
self.__download_thread.terminate()
self.__download_thread.wait()
"""Extends QWidget.reject. Terminate the download thread."""
self._stop_thread()
super().reject()

View File

@@ -3,21 +3,28 @@ from PyQt5.QtCore import Qt
class FixedAspectRatioLabel(QLabel):
"""Subclass QLabel. A resizable label class."""
def __init__(self, parent=None):
"""Initialize the instance. Set the pixmap to None."""
super().__init__(parent)
self.pixmap = None
def set_default_stylesheet(self):
"""Set the initial stylesheet of the label."""
self.setStyleSheet("""border-width: 1px;
border-style: solid;
border-color: black;"""
)
border-color: black;""")
def make_transparent(self):
"""Make the label transparent.
Remove text and border."""
self.setText('')
self.setStyleSheet("border-width: 0px;")
def apply_pixmap(self):
"""Apply a scaled pixmap without modifying the dimension of the original one."""
if self.pixmap:
self.setPixmap(
self.pixmap.scaled(
@@ -26,5 +33,6 @@ class FixedAspectRatioLabel(QLabel):
)
def rescale(self, size):
"""Rescale the widget and the displayed pixmap to the given size."""
self.resize(size)
self.apply_pixmap()

View File

@@ -3,14 +3,19 @@ from PyQt5.QtCore import QSize
class FixedAspectRatioWidget(QWidget):
space = 10
"""Subclass QWidget. Keep all the internal labels to a fixed aspect ratio."""
SPACE = 10
def __init__(self, parent=None):
"""Initialize the instance."""
super().__init__(parent)
self.labels = []
def resizeEvent(self, event):
"""Override QWidget.resizeEvent. Rescale all the internal widgets."""
h, w = self.height(), self.width()
h_lbl = h / 9 - self.space
h_lbl = h / 9 - self.SPACE
w_lbl = 5 * h_lbl
w_pad = w - 10
if w_lbl > w_pad:

View File

@@ -3,34 +3,49 @@ from constants import ForecastColors
class _BaseSwitchableLabel(QLabel):
"""Subclass QLabel. Base class for the switchable labels."""
def __init__(self, parent=None):
"""Set is_on to False and level to 0."""
super().__init__(parent)
self.is_on = False
self.level = 0
def switch_on(self):
"""Set is_on to True."""
self.is_on = True
def switch_off(self):
"""Set is_on to False."""
self.is_on = False
class SwitchableLabel(_BaseSwitchableLabel):
"""Subclass _BaseSwitchableLabel."""
def __init__(self, parent=None):
"""Define text and colors attributes."""
super().__init__(parent)
self.switch_on_colors = ()
self.switch_off_colors = ()
self.text_color = ''
def switch_on(self):
"""Extend _BaseSwitchableLabel.switch_on.
Apply the active state colors."""
super().switch_on()
self.__apply_colors(*self.switch_on_colors)
self._apply_colors(*self.switch_on_colors)
def switch_off(self):
super().switch_off()
self.__apply_colors(*self.switch_off_colors)
"""Extend _BaseSwitchableLabel.switch_off.
def __apply_colors(self, start, end):
Apply the inactive state colors."""
super().switch_off()
self._apply_colors(*self.switch_off_colors)
def _apply_colors(self, start, end):
"""Set text and background color of the label."""
self.setStyleSheet(
f"""
color:{self.text_color};
@@ -40,25 +55,33 @@ class SwitchableLabel(_BaseSwitchableLabel):
class SingleColorSwitchableLabel(_BaseSwitchableLabel):
"""Subclass _BaseSwitchableLabel."""
THRESHOLD = 30
def __init__(self, parent=None):
"""Set default active color."""
super().__init__(parent)
self.active_color = ForecastColors.WARNING_COLOR
def switch_on(self):
if self.level >= 30:
"""Extend _BaseSwitchableLabel.switch_on.
Apply the active state color if level >= THRESHOLD."""
if self.level >= self.THRESHOLD:
super().switch_on()
self.setStyleSheet(f"color: {self.active_color}"
# f"""background-color: {self.active_color};
# color: #000000;"""
)
self.setStyleSheet(f"color: {self.active_color}")
def switch_off(self):
"""Extend _BaseSwitchableLabel.switch_off.
Apply an empty stylesheet."""
super().switch_off()
# self.setStyleSheet("""background-color: transparent;""")
self.setStyleSheet("")
class MultiColorSwitchableLabel(_BaseSwitchableLabel):
"""Subclass _BaseSwitchableLabel."""
LEVEL_COLORS = {
9: ForecastColors.KP9_COLOR,
@@ -68,11 +91,18 @@ class MultiColorSwitchableLabel(_BaseSwitchableLabel):
5: ForecastColors.KP5_COLOR
}
MIN_LEVEL = list(LEVEL_COLORS.keys())[-1]
MAX_LEVEL = list(LEVEL_COLORS.keys())[0]
def __init__(self, parent=None):
"""Initialize the instance."""
super().__init__(parent)
def switch_on(self):
if 5 <= self.level <= 9:
"""Extend _BaseSwitchableLabel.switch_on.
Apply the active state color based on LEVEL_COLORS."""
if self.MIN_LEVEL <= self.level <= self.MAX_LEVEL:
super().switch_on()
self.setStyleSheet(
f"""color: {self.LEVEL_COLORS[self.level]};
@@ -80,20 +110,27 @@ class MultiColorSwitchableLabel(_BaseSwitchableLabel):
)
def switch_off(self):
"""Extend _BaseSwitchableLabel.switch_off.
Apply an empty stylesheet."""
super().switch_off()
# self.setStyleSheet("background-color: transparent;")
self.setStyleSheet("")
class SwitchableLabelsIterable:
"""Iterable class of _BaseSwitchableLabel."""
def __init__(self, *labels):
"""Set the labels to iterate through."""
self.labels = labels
def __iter__(self):
"""Define the iterator."""
for lab in self.labels:
yield lab
def switch_on(self, label):
"""Switch on the label 'label'. Switch off all the other labels."""
for lab in self.labels:
if lab is label:
lab.switch_on()
@@ -101,14 +138,19 @@ class SwitchableLabelsIterable:
lab.switch_off()
def switch_off_all(self):
"""Switch off all the labels."""
for lab in self.labels:
lab.switch_off()
def set(self, attr, value):
"""Set the attribute 'attr' equal to 'value' for all the labels."""
for lab in self.labels:
setattr(lab, attr, value)
def refresh(self):
"""Refresh the state of all the labels.
Used after the applied theme has changed."""
for lab in self.labels:
if lab.is_on:
lab.switch_on()

View File

@@ -11,6 +11,8 @@ from utilities import pop_up
class ThemeConstants:
"""Container class for all the theme-related constants."""
FOLDER = "themes"
EXTENSION = ".qss"
ICONS_FOLDER = "icons"
@@ -27,103 +29,188 @@ class ThemeConstants:
MISSING_THEME = "Missing theme in '" + FOLDER + "' folder."
MISSING_THEME_FOLDER = "'" + FOLDER + "'" + " folder not found.\nOnly the basic theme is available."
THEME_FOLDER_NOT_FOUND = "'" + FOLDER + "'" + " folder not found"
DEFAULT_ICONS_PATH = os.path.join(FOLDER, DEFAULT, ICONS_FOLDER)
DEFAULT_SEARCH_LABEL_PATH = os.path.join(DEFAULT_ICONS_PATH, Constants.SEARCH_LABEL_IMG)
DEFAULT_VOLUME_LABEL_PATH = os.path.join(DEFAULT_ICONS_PATH, Constants.VOLUME_LABEL_IMG)
CURRENT_THEME_FILE = os.path.join(FOLDER, CURRENT)
DEFAULT_THEME_PATH = os.path.join(FOLDER, DEFAULT)
class _ColorsHandler:
"""Manage the theme's secondary colors.
Contains a _Color inner class."""
class _Color:
"""Characterize a color from a string.
Can handle strings representing multiple colors."""
MAX_COLORS = 2
def __init__(self, line):
"""Define the color from the string 'line'.
All relevant features are defined:
- if the format is valid;
- if 'line' represent a single color or a list of colors.
- the 'quality' of the color."""
quality, color_str = line.split(ThemeConstants.COLOR_SEPARATOR)
color_str = color_str.strip()
self.quality = quality.lower().strip()
self.color_str = ''
self.color_list = []
if ',' in color_str:
self.is_simple_string = False
self.color_list = [c.strip() for c in color_str.split(',')]
else:
self.is_simple_string = True
self.color_str = color_str
self.is_valid = self._color_is_valid()
def _color_is_valid(self):
"""Return if the color (or the list of colors) has a valid html format."""
pattern = "#([a-zA-Z0-9]){6}"
def match_ok(col):
return bool(re.match(pattern, col)) and len(col) == 7
if not self.is_simple_string:
if len(self.color_list) <= self.MAX_COLORS:
return all(match_ok(c) for c in self.color_list)
else:
return False
else:
return match_ok(self.color_str)
def __init__(self, simple_color_list, double_color_list):
"""Initialize the lists of valid _Color objects."""
self.simple_color_list = simple_color_list
self.double_color_list = double_color_list
@classmethod
def from_file(cls, colors_str):
"""Return a _ColorsHandler object with two lists of valid _Color objects.
If the file is empty or there are no valid colors return None."""
if colors_str:
simple_color_list = []
double_color_list = []
for line in colors_str.splitlines():
color = cls._Color(line)
if color.is_valid:
if color.is_simple_string:
simple_color_list.append(color)
else:
double_color_list.append(color)
if simple_color_list or double_color_list:
return cls(simple_color_list, double_color_list)
return None
class ThemeManager:
def __init__(self, parent):
self.__parent = parent
self.__parent.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self.__parent.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
"""Manage all the operations releted to the themes."""
self.__theme_path = ""
self.__current_theme = ""
def __init__(self, owner):
"""Initialize the ThemeManager instance."""
self._owner = owner
self._owner.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self._owner.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
self.__parent.default_images_folder = os.path.join(
ThemeConstants.FOLDER,
ThemeConstants.DEFAULT,
ThemeConstants.ICONS_FOLDER
)
self._theme_path = ""
self._current_theme = ""
self.__space_weather_labels = SwitchableLabelsIterable(
self._space_weather_labels = SwitchableLabelsIterable(
*list(
chain(
self.__parent.switchable_r_labels,
self.__parent.switchable_s_labels,
self.__parent.switchable_g_now_labels,
self.__parent.switchable_g_today_labels,
self.__parent.k_storm_labels,
self.__parent.a_storm_labels,
[self.__parent.expected_noise_lbl]
self._owner.switchable_r_labels,
self._owner.switchable_s_labels,
self._owner.switchable_g_now_labels,
self._owner.switchable_g_today_labels,
self._owner.k_storm_labels,
self._owner.a_storm_labels,
[self._owner.expected_noise_lbl]
)
)
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
ThemeConstants.DEFAULT_ON_COLORS
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors", ThemeConstants.DEFAULT_OFF_COLORS
)
self.__theme_names = {}
self.__detect_themes()
self._theme_names = {}
def __refresh_range_labels(self):
self.__parent.set_acf_interval_label()
self.__parent.set_band_filter_label(
self.__parent.activate_low_band_filter_btn,
self.__parent.lower_band_spinbox,
self.__parent.lower_band_filter_unit,
self.__parent.lower_band_confidence,
self.__parent.activate_up_band_filter_btn,
self.__parent.upper_band_spinbox,
self.__parent.upper_band_filter_unit,
self.__parent.upper_band_confidence,
self.__parent.band_range_lbl
def _refresh_range_labels(self):
"""Refresh the range-labels."""
self._owner.set_acf_interval_label()
self._owner.set_band_filter_label(
self._owner.activate_low_band_filter_btn,
self._owner.lower_band_spinbox,
self._owner.lower_band_filter_unit,
self._owner.lower_band_confidence,
self._owner.activate_up_band_filter_btn,
self._owner.upper_band_spinbox,
self._owner.upper_band_filter_unit,
self._owner.upper_band_confidence,
self._owner.band_range_lbl
)
self.__parent.set_band_filter_label(
self.__parent.activate_low_freq_filter_btn,
self.__parent.lower_freq_spinbox,
self.__parent.lower_freq_filter_unit,
self.__parent.lower_freq_confidence,
self.__parent.activate_up_freq_filter_btn,
self.__parent.upper_freq_spinbox,
self.__parent.upper_freq_filter_unit,
self.__parent.upper_freq_confidence,
self.__parent.freq_range_lbl
self._owner.set_band_filter_label(
self._owner.activate_low_freq_filter_btn,
self._owner.lower_freq_spinbox,
self._owner.lower_freq_filter_unit,
self._owner.lower_freq_confidence,
self._owner.activate_up_freq_filter_btn,
self._owner.upper_freq_spinbox,
self._owner.upper_freq_filter_unit,
self._owner.upper_freq_confidence,
self._owner.freq_range_lbl
)
@pyqtSlot()
def __apply(self, theme_path):
self.__theme_path = theme_path
def _apply(self, theme_path):
"""Apply the selected theme.
Refresh all relevant widgets.
Display a QMessageBox if the theme is not found."""
self._theme_path = theme_path
if os.path.exists(theme_path):
if self.__theme_path != self.__current_theme:
self.__change()
self.__parent.display_specs(
item=self.__parent.result_list.currentItem(),
if self._theme_path != self._current_theme:
self._change()
self._owner.display_specs(
item=self._owner.signals_list.currentItem(),
previous_item=None
)
self.__refresh_range_labels()
self.__parent.audio_widget.refresh_btns_colors(
self.__parent.active_color,
self.__parent.inactive_color
self._refresh_range_labels()
self._owner.audio_widget.refresh_btns_colors(
self._owner.active_color,
self._owner.inactive_color
)
self.__space_weather_labels.refresh()
self._space_weather_labels.refresh()
else:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
def __pretty_name(self, bad_name):
def _pretty_name(self, bad_name):
"""Return a well-formatted theme name."""
return ' '.join(
map(lambda s: s.capitalize(),
map(
lambda s: s.capitalize(),
bad_name.split('_')
)
)
def __detect_themes(self):
def _detect_themes(self):
"""Detect all available themes.
Connect all the actions to change the theme.
Display a QMessageBox if the theme folder is not found."""
themes = []
ag = QActionGroup(self.__parent, exclusive=True)
ag = QActionGroup(self._owner, exclusive=True)
if os.path.exists(ThemeConstants.FOLDER):
for theme_folder in sorted(os.listdir(ThemeConstants.FOLDER)):
relative_folder = os.path.join(ThemeConstants.FOLDER, theme_folder)
@@ -131,115 +218,64 @@ class ThemeManager:
relative_folder = os.path.join(ThemeConstants.FOLDER, theme_folder)
themes.append(relative_folder)
for theme_path in themes:
theme_name = '&' + self.__pretty_name(os.path.basename(theme_path))
theme_name = '&' + self._pretty_name(os.path.basename(theme_path))
new_theme = ag.addAction(
QAction(
theme_name,
self.__parent, checkable=True
self._owner, checkable=True
)
)
self.__parent.menu_themes.addAction(new_theme)
self.__theme_names[theme_name.lstrip('&')] = new_theme
new_theme.triggered.connect(partial(self.__apply, theme_path))
self._owner.menu_themes.addAction(new_theme)
self._theme_names[theme_name.lstrip('&')] = new_theme
new_theme.triggered.connect(partial(self._apply, theme_path))
else:
pop_up(self.__parent, title=ThemeConstants.THEME_FOLDER_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_FOLDER_NOT_FOUND,
text=ThemeConstants.MISSING_THEME_FOLDER).show()
def __is_valid_html_color(self, colors):
pattern = "#([a-zA-Z0-9]){6}"
match_ok = lambda col: bool(re.match(pattern, col))
if isinstance(colors, list):
if len(colors) > 1:
return all(match_ok(c) for c in colors)
else:
return match_ok(colors[0])
else:
return match_ok(colors)
def _change(self):
"""Change the current theme.
def __change(self):
theme_name = os.path.basename(self.__theme_path) + ThemeConstants.EXTENSION
Apply the stylesheet and set active and inactive colors.
Set all the new images needed.
Save the new current theme on file."""
theme_name = os.path.basename(self._theme_path) + ThemeConstants.EXTENSION
try:
with open(
os.path.join(self.__theme_path, theme_name), "r"
) as stylesheet:
with open(os.path.join(self._theme_path, theme_name), "r") as stylesheet:
style = stylesheet.read()
self.__parent.setStyleSheet(style)
self.__parent.download_window.setStyleSheet(style)
self._owner.setStyleSheet(style)
self._owner.download_window.setStyleSheet(style)
except FileNotFoundError:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
icons_path = os.path.join(self.__theme_path, ThemeConstants.ICONS_FOLDER)
default_icons_path = os.path.join(
ThemeConstants.FOLDER,
ThemeConstants.DEFAULT,
ThemeConstants.ICONS_FOLDER
)
icons_path = os.path.join(self._theme_path, ThemeConstants.ICONS_FOLDER)
if os.path.exists(os.path.join(icons_path, Constants.NOT_SELECTED)) and \
os.path.exists(os.path.join(icons_path, Constants.NOT_AVAILABLE)):
self.__parent.default_images_folder = icons_path
else:
self.__parent.default_images_folder = default_icons_path
path_to_search_label = os.path.join(
icons_path,
Constants.SEARCH_LABEL_IMG
)
default_search_label = os.path.join(
default_icons_path,
Constants.SEARCH_LABEL_IMG
)
path_to_search_label = os.path.join(icons_path, Constants.SEARCH_LABEL_IMG)
if os.path.exists(path_to_search_label):
self.__parent.search_label.setPixmap(
QPixmap(path_to_search_label)
)
self.__parent.modulation_search_label.setPixmap(
QPixmap(path_to_search_label)
)
self.__parent.location_search_label.setPixmap(
QPixmap(path_to_search_label)
)
path = path_to_search_label
else:
self.__parent.search_label.setPixmap(
QPixmap(default_search_label)
)
self.__parent.modulation_search_label.setPixmap(
QPixmap(default_search_label)
)
self.__parent.location_search_label.setPixmap(
QPixmap(default_search_label)
)
path = ThemeConstants.DEFAULT_SEARCH_LABEL_PATH
self.__parent.search_label.setScaledContents(True)
self.__parent.modulation_search_label.setScaledContents(True)
self.__parent.location_search_label.setScaledContents(True)
self._owner.search_label.setPixmap(QPixmap(path))
self._owner.modulation_search_label.setPixmap(QPixmap(path))
self._owner.location_search_label.setPixmap(QPixmap(path))
path_to_volume_label = os.path.join(
icons_path,
Constants.VOLUME_LABEL_IMG
)
default_volume_label = os.path.join(
default_icons_path,
Constants.VOLUME_LABEL_IMG
)
self._owner.search_label.setScaledContents(True)
self._owner.modulation_search_label.setScaledContents(True)
self._owner.location_search_label.setScaledContents(True)
path_to_volume_label = os.path.join(icons_path, Constants.VOLUME_LABEL_IMG)
if os.path.exists(path_to_volume_label):
self.__parent.volume_label.setPixmap(
QPixmap(path_to_volume_label)
)
path = path_to_volume_label
else:
self.__parent.volume_label.setPixmap(
QPixmap(default_volume_label)
)
path = ThemeConstants.DEFAULT_VOLUME_LABEL_PATH
self.__parent.volume_label.setScaledContents(True)
self._owner.volume_label.setPixmap(QPixmap(path))
self._owner.volume_label.setScaledContents(True)
path_to_colors = os.path.join(
self.__theme_path,
ThemeConstants.COLORS
)
path_to_colors = os.path.join(self._theme_path, ThemeConstants.COLORS)
active_color_ok = False
inactive_color_ok = False
@@ -249,100 +285,84 @@ class ThemeManager:
if os.path.exists(path_to_colors):
with open(path_to_colors, "r") as colors_file:
for line in colors_file:
if ThemeConstants.COLOR_SEPARATOR in line:
quality, color = line.split(ThemeConstants.COLOR_SEPARATOR)
color = color.rstrip()
color_len = 1
if ',' in color:
color = [c.strip() for c in color.split(',')]
color_len = len(color)
if self.__is_valid_html_color(color):
if color_len == 1:
if quality.lower() == Constants.ACTIVE:
self.__parent.active_color = color
color_handler = _ColorsHandler.from_file(colors_file.read())
if color_handler is not None:
for color in color_handler.simple_color_list:
if color.quality == Constants.ACTIVE:
self._owner.active_color = color.color_str
active_color_ok = True
if quality.lower() == Constants.INACTIVE:
self.__parent.inactive_color = color
if color.quality == Constants.INACTIVE:
self._owner.inactive_color = color.color_str
inactive_color_ok = True
if quality.lower() == Constants.TEXT_COLOR:
if color.quality == Constants.TEXT_COLOR:
text_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"text_color",
color
color.color_str
)
if color_len == 2:
if quality.lower() == Constants.LABEL_ON_COLOR:
for color in color_handler.double_color_list:
if color.quality == Constants.LABEL_ON_COLOR:
switch_on_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
color
color.color_list
)
if quality.lower() == Constants.LABEL_OFF_COLOR:
if color.quality == Constants.LABEL_OFF_COLOR:
switch_off_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors",
color
color.color_list
)
if not (active_color_ok and inactive_color_ok):
self.__parent.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self.__parent.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
self._owner.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self._owner.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
if not (switch_on_color_ok and switch_off_color_ok):
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
ThemeConstants.DEFAULT_ON_COLORS
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors",
ThemeConstants.DEFAULT_OFF_COLORS
)
if not text_color_ok:
self.__space_weather_labels.set(
self._space_weather_labels.set(
"text_color",
ThemeConstants.DEFAULT_TEXT_COLOR
)
self.__current_theme = self.__theme_path
self._current_theme = self._theme_path
try:
with open(os.path.join(
ThemeConstants.FOLDER,
ThemeConstants.CURRENT
), "w") as current_theme:
current_theme.write(self.__theme_path)
with open(ThemeConstants.CURRENT_THEME_FILE, "w") as current_theme:
current_theme.write(self._theme_path)
except Exception:
pass
def start(self):
current_theme_file = os.path.join(
ThemeConstants.FOLDER,
ThemeConstants.CURRENT
)
if os.path.exists(current_theme_file):
with open(current_theme_file, "r") as current_theme_path:
"""Start the theme manager."""
self._detect_themes()
if os.path.exists(ThemeConstants.CURRENT_THEME_FILE):
with open(ThemeConstants.CURRENT_THEME_FILE, "r") as current_theme_path:
theme_path = current_theme_path.read()
theme_name = self.__pretty_name(os.path.basename(theme_path))
theme_name = self._pretty_name(os.path.basename(theme_path))
try:
self.__theme_names[theme_name].setChecked(True)
self._theme_names[theme_name].setChecked(True)
except Exception:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
self.__apply(theme_path)
self._apply(theme_path)
else:
try:
self.__theme_names[
self.__pretty_name(ThemeConstants.DEFAULT)
self._theme_names[
self._pretty_name(ThemeConstants.DEFAULT)
].setChecked(True)
except Exception:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
self.__apply(
os.path.join(
ThemeConstants.FOLDER,
ThemeConstants.DEFAULT
)
)
self._apply(ThemeConstants.DEFAULT_THEME_PATH)

View File

@@ -14,6 +14,8 @@ from utilities import checksum_ok
class ThreadStatus(Enum):
"""Possible thread status."""
OK = auto()
NO_CONNECTION_ERR = auto()
UNKNOWN_ERR = auto()
@@ -22,62 +24,88 @@ class ThreadStatus(Enum):
class BaseDownloadThread(QThread):
"""Subclass QThread. Base class for the download threads."""
def __init__(self, parent=None):
"""Set the status to 'UNDEFINED'."""
super().__init__(parent)
self.status = ThreadStatus.UNDEFINED
def __del__(self):
"""Force the termination of the thread."""
self.terminate()
self.wait()
class DownloadThread(BaseDownloadThread):
"""Subclass BaseDownloadThread. Download the database, images and audio samples."""
progress = pyqtSignal(int, float)
CHUNK = 1024**2
_CHUNK = 512 * 1024
_MEGA = 1024**2
def __init__(self):
"""Just call super().__init__."""
self._db = None
self._exit_call = False
super().__init__()
def __pretty_len(self, byte_obj):
mega = len(byte_obj) / self.CHUNK
def _pretty_len(self, byte_obj):
"""Return a well-formatted number of downloaded MB."""
mega = len(byte_obj) / self._MEGA
if mega.is_integer():
return int(mega)
else:
return ceil(mega)
def __get_download_speed(self, data, delta):
def _get_download_speed(self, data, delta):
"""Return the download speed in MB/s."""
return round(
(len(data) / self.CHUNK) / delta,
2
(len(data) / self._MEGA) / delta, 2
)
def set_exit(self):
self._exit_call = True
def run(self):
"""Override QThread.run. Download the database, images and audio samples.
Handle all possible exceptions. Also extract the files
in the local folder."""
self.status = ThreadStatus.UNDEFINED
self._db = None
raw_data = bytes(0)
try:
db = urllib3.PoolManager().request(
self._db = urllib3.PoolManager().request(
'GET',
Database.LINK_LOC,
preload_content=False
preload_content=False,
timeout=4.0
)
while True:
start = time()
data = db.read(self.CHUNK)
try:
data = self._db.read(self._CHUNK)
except Exception:
raise
else:
delta = time() - start
if not data:
break
raw_data += data
self.progress.emit(
self.__pretty_len(raw_data),
self.__get_download_speed(data, delta)
self._pretty_len(raw_data),
self._get_download_speed(data, delta)
)
db.release_conn()
if self._exit_call:
self._exit_call = False
self._db.release_conn()
return
except Exception: # No internet connection.
db.release_conn()
self._db.release_conn()
self.status = ThreadStatus.NO_CONNECTION_ERR
return
if db.status != 200:
if self._db.status != 200:
self.status = ThreadStatus.BAD_DOWNLOAD_ERR
return
try:
@@ -102,42 +130,54 @@ class DownloadThread(BaseDownloadThread):
class _AsyncDownloader:
"""Mixin class for asynchronous threads."""
async def _download_resource(self, session, link):
"""Return the content of 'link' as bytes."""
resp = await session.get(link)
return await resp.read()
class UpdateSpaceWeatherThread(BaseDownloadThread, _AsyncDownloader):
"""Subclass BaseDownloadThread. Downlaod the space weather data."""
__properties = ("xray", "prot_el", "ak_index", "sgas", "geo_storm")
_PROPERTIES = ("xray", "prot_el", "ak_index", "sgas", "geo_storm")
def __init__(self, space_weather_data):
"""Initialize the a local space_weather_data."""
super().__init__()
self.__space_weather_data = space_weather_data
self._space_weather_data = space_weather_data
async def __download_property(self, session, property_name):
async def _download_property(self, session, property_name):
"""Download the data conteining the information of a specific property."""
link = getattr(Constants, "SPACE_WEATHER_" + property_name.upper())
data = await self._download_resource(session, link)
setattr(self.__space_weather_data, property_name, str(data, 'utf-8'))
setattr(self._space_weather_data, property_name, str(data, 'utf-8'))
async def __download_image(self, session, n):
im = await self._download_resource(session, Constants.SPACE_WEATHER_IMGS[n])
self.__space_weather_data.images[n].loadFromData(im)
async def _download_image(self, session, n):
"""Download the data corresponding the n-th image displayed in the screen."""
im = await self._download_resource(
session, Constants.SPACE_WEATHER_IMGS[n]
)
self._space_weather_data.images[n].loadFromData(im)
async def _download_resources(self):
"""Download all the data."""
session = aiohttp.ClientSession()
try:
t = []
for p in self.__properties:
for p in self._PROPERTIES:
t.append(
asyncio.create_task(self.__download_property(session, p))
asyncio.create_task(self._download_property(session, p))
)
tot_images = range(len(Constants.SPACE_WEATHER_IMGS))
t1 = []
for im_number in tot_images:
t1.append(
asyncio.create_task(self.__download_image(session, im_number))
asyncio.create_task(
self._download_image(session, im_number)
)
)
await asyncio.gather(*t, *t1)
except Exception:
@@ -148,41 +188,47 @@ class UpdateSpaceWeatherThread(BaseDownloadThread, _AsyncDownloader):
await session.close()
def run(self):
"""Override QThread.run. Start the download of the data."""
self.status = ThreadStatus.UNDEFINED
asyncio.run(self._download_resources())
class UpdateForecastThread(BaseDownloadThread, _AsyncDownloader):
"""Subclass BaseDownloadThread. Download the forecast data."""
class _PropertyName(Enum):
"""Enum used to differentiate between the two data needed."""
FORECAST = auto()
PROBABILITIES = auto()
def __init__(self, parent):
def __init__(self, owner):
"""Set the owner object (a ForecastData instance)."""
super().__init__()
self.parent = parent
self.owner = owner
async def __download_property(self, session, link, prop_name):
async def _download_property(self, session, link, prop_name):
"""Download the data from 'link' and set the corresponding property of the owner."""
resp = await self._download_resource(session, link)
resp = str(resp, 'utf-8')
if prop_name is self._PropertyName.FORECAST:
self.parent.forecast = resp
self.owner.forecast = resp
if prop_name is self._PropertyName.PROBABILITIES:
self.parent.probabilities = resp
self.owner.probabilities = resp
async def _download_resources(self):
"""Download all the data needed."""
session = aiohttp.ClientSession()
try:
await asyncio.gather(
asyncio.create_task(
self.__download_property(
self._download_property(
session,
Constants.SPACE_WEATHER_GEO_STORM,
self._PropertyName.FORECAST
)
),
asyncio.create_task(
self.__download_property(
self._download_property(
session,
Constants.FORECAST_PROBABILITIES,
self._PropertyName.PROBABILITIES
@@ -197,5 +243,6 @@ class UpdateForecastThread(BaseDownloadThread, _AsyncDownloader):
await session.close()
def run(self):
"""Override QThread.run. Start the data download."""
self.status = ThreadStatus.UNDEFINED
asyncio.run(self._download_resources())

View File

@@ -8,23 +8,35 @@ from PyQt5.QtWidgets import QMessageBox
from constants import Constants, Signal, Database, ChecksumWhat
def resource_path(relative_path):
"""Get absolute path to resource, works for dev and for PyInstaller."""
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
def uncheck_and_emit(button):
"""Set the button to the unchecked state and emit the clicked signal."""
if button.isChecked():
button.setChecked(False)
button.clicked.emit()
def pop_up(cls, title, text,
informative_text=None,
connection=None,
is_question=False,
default_btn=QMessageBox.Yes):
"""Return a QMessageBox object.
Keyword arguments:
informative_text -- possible informative text to be displayed.
connection -- a callable to connect the message when emitting the finished signal.
is_question -- whether the message contains a question.
default_btn -- the default button for the possible answer to the question."""
msg = QMessageBox(cls)
msg.setWindowTitle(title)
msg.setText(text)
@@ -38,7 +50,9 @@ def pop_up(cls, title, text,
msg.adjustSize()
return msg
def checksum_ok(data, what):
"""Check whether the checksum of the 'data' argument is correct."""
code = hashlib.sha256()
code.update(data)
if what is ChecksumWhat.FOLDER:
@@ -56,7 +70,11 @@ def checksum_ok(data, what):
raise
return code.hexdigest() == reference
def connect_events_to_func(events_to_connect, fun_to_connect, fun_args):
"""Connect all elements of events_to_connect to the callable fun_to_connect.
fun_args is a list of fun_to_connect arguments."""
if fun_args is not None:
for event in events_to_connect:
event.connect(partial(fun_to_connect, *fun_args))
@@ -64,22 +82,30 @@ def connect_events_to_func(events_to_connect, fun_to_connect, fun_args):
for event in events_to_connect:
event.connect(fun_to_connect)
def filters_limit(spinbox, filter_unit, confidence, sign=1):
"""Return the actual limit of a numerical filter."""
band_filter = spinbox.value() * Constants.CONVERSION_FACTORS[filter_unit.currentText()]
return band_filter + sign * (confidence.value() * band_filter) // 100
def is_undef_freq(current_signal):
"""Return whether the lower or upper frequency of a signal is undefined."""
lower_freq = current_signal.at[Signal.INF_FREQ]
upper_freq = current_signal.at[Signal.SUP_FREQ]
return lower_freq == Constants.UNKNOWN or upper_freq == Constants.UNKNOWN
def is_undef_band(current_signal):
"""Return whether the lower or upper band of a signal is undefined."""
lower_band = current_signal.at[Signal.INF_BAND]
upper_band = current_signal.at[Signal.SUP_BAND]
return lower_band == Constants.UNKNOWN or upper_band == Constants.UNKNOWN
def _change_unit(num):
digits = len(num)
def _change_unit(str_num):
"""Return a scale factor given the number of digits of a numeric string."""
digits = len(str_num)
if digits < 4:
return 1
elif digits < 7:
@@ -89,14 +115,16 @@ def _change_unit(num):
else:
return 10**9
def format_numbers(lower, upper):
"""Return the string which displays the numeric limits of a filter."""
units = {1: 'Hz', 1000: 'kHz', 10**6: 'MHz', 10**9: 'GHz'}
lower_factor = _change_unit(lower)
upper_factor = _change_unit(upper)
pre_lower = lower
pre_upper = upper
lower = int(lower) / lower_factor
upper = int(upper) / upper_factor
lower = safe_cast(lower, int) / lower_factor
upper = safe_cast(upper, int) / upper_factor
if lower.is_integer():
lower = int(lower)
else:
@@ -109,3 +137,19 @@ def format_numbers(lower, upper):
return f"{lower:,} {units[lower_factor]} - {upper:,} {units[upper_factor]}"
else:
return f"{lower:,} {units[lower_factor]}"
def safe_cast(value, cast_type, default=-1):
"""Call 'cast_type(value)' and return the result.
If the operation fails return 'default'.
Should be used to perform 'safe casts'.
Keyword argument:
default -- default value returned if the cast fails.
"""
try:
r = cast_type(value)
except Exception:
r = default
finally:
return r

View File

@@ -7,27 +7,39 @@ from threads import (BaseDownloadThread,
UpdateForecastThread)
from constants import Constants
from switchable_label import MultiColorSwitchableLabel
from utilities import safe_cast
class _BaseWeatherData(QObject):
"""Base class for the weather data. Extends QObject."""
update_complete = pyqtSignal(bool)
def __init__(self):
"""Create a BaseDownloadThread object."""
super().__init__()
self._update_thread = BaseDownloadThread()
@property
def is_updating(self):
"""Return whether the thread is running."""
return self._update_thread.isRunning()
def update(self):
"""Start the thread."""
self._update_thread.start()
def _parse_data(self):
"""Dummy function. Must be overrided by subclasses."""
pass
@pyqtSlot()
def _parse_and_emit_signal(self):
"""Parse the data and emit an 'update_complete' signal.
If the download was not successful, do not parse the data.
The 'update_complete' signal propagates the thread status up to the
calling slot."""
status_ok = False
if self._update_thread.status is ThreadStatus.OK:
status_ok = True
@@ -35,15 +47,22 @@ class _BaseWeatherData(QObject):
self.update_complete.emit(status_ok)
def _double_split(self, string):
"""Given a string, return a list of lists.
First split on each line. Then split each line on whitespaces."""
return [i.split() for i in string.splitlines()]
def shutdown_thread(self):
"""Terminate the download thread."""
self._update_thread.terminate()
self._update_thread.wait()
class SpaceWeatherData(_BaseWeatherData):
"""Space weather class. Extends _BaseWeatherData."""
def __init__(self):
"""Set all attributes and connect the thread to _parse_and_emit_signal."""
super().__init__()
self.xray = ''
self.prot_el = ''
@@ -65,6 +84,9 @@ class SpaceWeatherData(_BaseWeatherData):
self._update_thread.finished.connect(self._parse_and_emit_signal)
def _parse_data(self):
"""Override _BaseWeatherData._parse_data.
Set all the data."""
self.xray = self._double_split(self.xray)
self.prot_el = self._double_split(self.prot_el)
self.ak_index = self._double_split(self.ak_index)
@@ -72,6 +94,7 @@ class SpaceWeatherData(_BaseWeatherData):
self.geo_storm = self._double_split(self.geo_storm)
def remove_data(self):
"""Remove the reference to all the data."""
self.xray = ''
self.prot_el = ''
self.ak_index = ''
@@ -90,7 +113,105 @@ class SpaceWeatherData(_BaseWeatherData):
]
def _make_labels_table(forecast, probabilities, rows):
"""Organize all the arguments to feed _get_lbl_value."""
def get_first_split(x):
return x.split("/")[0]
def get_second_split(x):
return x.split("/")[1]
def get_third_split(x):
return x.split("/")[2]
solar_row = rows["solar_row"]
event_row = rows["event_row"]
rb_now_row = rows["rb_now_row"]
ga_now_row = rows["ga_now_row"]
kp_index_row = rows["kp_index_row"]
return [
[
[forecast, solar_row, 3, None],
[probabilities, event_row + 1, 2, get_first_split],
[probabilities, event_row + 2, 2, get_first_split],
[probabilities, event_row + 3, 1, get_first_split],
[forecast, rb_now_row, 1, None],
[forecast, rb_now_row + 1, 3, None],
[probabilities, ga_now_row + 2, 1, get_first_split],
[probabilities, ga_now_row + 3, 2, get_first_split],
[probabilities, ga_now_row + 4, 2, get_first_split],
[probabilities, ga_now_row + 6, 1, get_first_split],
[probabilities, ga_now_row + 7, 2, get_first_split],
[probabilities, ga_now_row + 8, 2, get_first_split],
[forecast, kp_index_row + 3, 1, None],
[forecast, kp_index_row + 4, 1, None],
[forecast, kp_index_row + 5, 1, None],
[forecast, kp_index_row + 6, 1, None],
[forecast, kp_index_row + 7, 1, None],
[forecast, kp_index_row + 8, 1, None],
[forecast, kp_index_row + 9, 1, None],
[forecast, kp_index_row + 10, 1, None]
],
[
[forecast, solar_row, 4, None],
[probabilities, event_row + 1, 2, get_second_split],
[probabilities, event_row + 2, 2, get_second_split],
[probabilities, event_row + 3, 1, get_second_split],
[forecast, rb_now_row, 2, None],
[forecast, rb_now_row + 1, 4, None],
[probabilities, ga_now_row + 2, 1, get_second_split],
[probabilities, ga_now_row + 3, 2, get_second_split],
[probabilities, ga_now_row + 4, 2, get_second_split],
[probabilities, ga_now_row + 6, 1, get_second_split],
[probabilities, ga_now_row + 7, 2, get_second_split],
[probabilities, ga_now_row + 8, 2, get_second_split],
[forecast, kp_index_row + 3, 2, None],
[forecast, kp_index_row + 4, 2, None],
[forecast, kp_index_row + 5, 2, None],
[forecast, kp_index_row + 6, 2, None],
[forecast, kp_index_row + 7, 2, None],
[forecast, kp_index_row + 8, 2, None],
[forecast, kp_index_row + 9, 2, None],
[forecast, kp_index_row + 10, 2, None]
],
[
[forecast, solar_row, 5, None],
[probabilities, event_row + 1, 2, get_third_split],
[probabilities, event_row + 2, 2, get_third_split],
[probabilities, event_row + 3, 1, get_third_split],
[forecast, rb_now_row, 3, None],
[forecast, rb_now_row + 1, 5, None],
[probabilities, ga_now_row + 2, 1, get_third_split],
[probabilities, ga_now_row + 3, 2, get_third_split],
[probabilities, ga_now_row + 4, 2, get_third_split],
[probabilities, ga_now_row + 6, 1, get_third_split],
[probabilities, ga_now_row + 7, 2, get_third_split],
[probabilities, ga_now_row + 8, 2, get_third_split],
[forecast, kp_index_row + 3, 3, None],
[forecast, kp_index_row + 4, 3, None],
[forecast, kp_index_row + 5, 3, None],
[forecast, kp_index_row + 6, 3, None],
[forecast, kp_index_row + 7, 3, None],
[forecast, kp_index_row + 8, 3, None],
[forecast, kp_index_row + 9, 3, None],
[forecast, kp_index_row + 10, 3, None]
]
]
def _get_lbl_value(data, row, col, f=None):
"""Return the well-formatted string-value of the label."""
val = data[row][col]
if f is not None:
val = f(val)
val = val.rstrip('%')
if len(val) > 1:
val = val.lstrip('0')
return val
class ForecastData(_BaseWeatherData):
"""3-day forecast class. Extends _BaseWeatherData."""
ROW_KEYWORDS = {
"solar_row": "S1 or greater",
@@ -100,49 +221,44 @@ class ForecastData(_BaseWeatherData):
"kp_index_row": "NOAA Kp index breakdown"
}
def __init__(self, parent):
LABELS_PER_COLUMN = 20
def __init__(self, owner):
"""Initialize all attributes and connect the thread to _parse_and_emit_signal."""
super().__init__()
self.forecast = ''
self.probabilities = ''
self.__labels_table = []
self.__solar_row = None
self.__event_row = None
self.__rb_now_row = None
self.__ga_now_row = None
self.__kp_index_row = None
self.forecast = []
self.probabilities = []
self._update_thread = UpdateForecastThread(self)
self._update_thread.finished.connect(self._parse_and_emit_signal)
self.today_lbl = parent.today_lbl
self.today_p1_lbl = parent.today_p1_lbl
self.today_p2_lbl = parent.today_p2_lbl
self.__today_lbls = []
self.__today_p1_lbls = []
self.__today_p2_lbls = []
self.__all_lbls = []
self._today_lbl = owner.today_lbl
self._today_p1_lbl = owner.today_p1_lbl
self._today_p2_lbl = owner.today_p2_lbl
today_lbls = []
today_p1_lbls = []
today_p2_lbls = []
flags = ['', 'p1_', 'p2_']
for flag in flags:
title_lbl = getattr(self, "today_" + flag + "lbl")
title_lbl = getattr(self, "_today_" + flag + "lbl")
title_lbl.setText("-")
for index in range(20):
for index in range(self.LABELS_PER_COLUMN):
label = getattr(
parent,
owner,
"forecast_today_" + flag + str(index) + "_lbl"
)
label.setText(Constants.UNKNOWN)
if flag == flags[0]:
self.__today_lbls.append(label)
today_lbls.append(label)
if flag == flags[1]:
self.__today_p1_lbls.append(label)
today_p1_lbls.append(label)
if flag == flags[2]:
self.__today_p2_lbls.append(label)
today_p2_lbls.append(label)
self.__all_lbls = [
self.__today_lbls,
self.__today_p1_lbls,
self.__today_p2_lbls
]
self._all_lbls = [today_lbls, today_p1_lbls, today_p2_lbls]
def _parse_data(self):
"""Override _BaseWeatherData._parse_data.
Set all the relevant data."""
# Remove possible '(G\d)' from the kp_index table
self.forecast = re.sub(
'\(G\d\)', lambda obj: '', self.forecast
@@ -153,167 +269,85 @@ class ForecastData(_BaseWeatherData):
)
self.probabilities = self.probabilities.splitlines()
def __split_lists(self):
self.forecast = [i.split() for i in self.forecast]
self.probabilities = [i.split() for i in self.probabilities]
def _split_lists(self):
"""Split the elements of forecast and probabilities."""
return [i.split() for i in self.forecast], [i.split() for i in self.probabilities]
def __find_row_with(self, data, text):
def _find_row_with(self, data, text):
"""Given a list of strings, return the index of the first string containing the target text."""
for i, row in enumerate(data):
if text in row:
return i
return None
def __get_rows(self):
self.__solar_row = self.__find_row_with(
def _get_rows(self):
"""Get all the rows needed for updating the screen.
Raise an exception if something goes wrong."""
rows = {}
rows["solar_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["solar_row"]
)
self.__event_row = self.__find_row_with(
rows["event_row"] = self._find_row_with(
self.probabilities,
self.ROW_KEYWORDS["event_row"]
)
self.__rb_now_row = self.__find_row_with(
rows["rb_now_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["rb_now_row"]
)
self.__ga_now_row = self.__find_row_with(
rows["ga_now_row"] = self._find_row_with(
self.probabilities,
self.ROW_KEYWORDS["ga_now_row"]
)
self.__kp_index_row = self.__find_row_with(
rows["kp_index_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["kp_index_row"]
)
is_none = lambda x: x is None
if any([
is_none(self.__solar_row),
is_none(self.__event_row),
is_none(self.__rb_now_row),
is_none(self.__ga_now_row),
is_none(self.__kp_index_row)
]):
if any(row is None for row in rows.values()):
raise Exception('Missing Rows')
def __set_dates(self):
month = self.forecast[self.__solar_row - 1][0]
today = self.forecast[self.__solar_row - 1][1]
today_p1 = self.forecast[self.__solar_row - 1][3]
today_p2 = self.forecast[self.__solar_row - 1][5]
self.today_lbl.setText(month + ' ' + today)
self.today_p1_lbl.setText(month + ' ' + today_p1)
self.today_p2_lbl.setText(month + ' ' + today_p2)
def __make_labels_table(self):
get_first_split = lambda x: x.split("/")[0]
get_second_split = lambda x: x.split("/")[1]
get_third_split = lambda x: x.split("/")[2]
self.__labels_table = [
[
[self.forecast, self.__solar_row, 3, None],
[self.probabilities, self.__event_row + 1, 2, get_first_split],
[self.probabilities, self.__event_row + 2, 2, get_first_split],
[self.probabilities, self.__event_row + 3, 1, get_first_split],
[self.forecast, self.__rb_now_row, 1, None],
[self.forecast, self.__rb_now_row + 1, 3, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_first_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_first_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_first_split],
[self.forecast, self.__kp_index_row + 3, 1, None],
[self.forecast, self.__kp_index_row + 4, 1, None],
[self.forecast, self.__kp_index_row + 5, 1, None],
[self.forecast, self.__kp_index_row + 6, 1, None],
[self.forecast, self.__kp_index_row + 7, 1, None],
[self.forecast, self.__kp_index_row + 8, 1, None],
[self.forecast, self.__kp_index_row + 9, 1, None],
[self.forecast, self.__kp_index_row + 10, 1, None]
],
[
[self.forecast, self.__solar_row, 4, None],
[self.probabilities, self.__event_row + 1, 2, get_second_split],
[self.probabilities, self.__event_row + 2, 2, get_second_split],
[self.probabilities, self.__event_row + 3, 1, get_second_split],
[self.forecast, self.__rb_now_row, 2, None],
[self.forecast, self.__rb_now_row + 1, 4, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_second_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_second_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_second_split],
[self.forecast, self.__kp_index_row + 3, 2, None],
[self.forecast, self.__kp_index_row + 4, 2, None],
[self.forecast, self.__kp_index_row + 5, 2, None],
[self.forecast, self.__kp_index_row + 6, 2, None],
[self.forecast, self.__kp_index_row + 7, 2, None],
[self.forecast, self.__kp_index_row + 8, 2, None],
[self.forecast, self.__kp_index_row + 9, 2, None],
[self.forecast, self.__kp_index_row + 10, 2, None]
],
[
[self.forecast, self.__solar_row, 5, None],
[self.probabilities, self.__event_row + 1, 2, get_third_split],
[self.probabilities, self.__event_row + 2, 2, get_third_split],
[self.probabilities, self.__event_row + 3, 1, get_third_split],
[self.forecast, self.__rb_now_row, 3, None],
[self.forecast, self.__rb_now_row + 1, 5, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_third_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_third_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_third_split],
[self.forecast, self.__kp_index_row + 3, 3, None],
[self.forecast, self.__kp_index_row + 4, 3, None],
[self.forecast, self.__kp_index_row + 5, 3, None],
[self.forecast, self.__kp_index_row + 6, 3, None],
[self.forecast, self.__kp_index_row + 7, 3, None],
[self.forecast, self.__kp_index_row + 8, 3, None],
[self.forecast, self.__kp_index_row + 9, 3, None],
[self.forecast, self.__kp_index_row + 10, 3, None]
]
]
def __get_lbl_value(self, data, row, col, f = None):
val = data[row][col]
if f is not None:
val = f(val)
val = val.lstrip('0').rstrip('%')
return val
def __is_integer(self, s):
try:
int(s)
except Exception:
return False
else:
return True
return rows
def __set_labels_values(self):
for lbl_list, table in zip(self.__all_lbls, self.__labels_table):
def _set_dates(self, forecast, solar_row):
"""Set the date labels."""
month = forecast[solar_row - 1][0]
today = forecast[solar_row - 1][1]
today_p1 = forecast[solar_row - 1][3]
today_p2 = forecast[solar_row - 1][5]
self._today_lbl.setText(month + ' ' + today)
self._today_p1_lbl.setText(month + ' ' + today_p1)
self._today_p2_lbl.setText(month + ' ' + today_p2)
def _set_labels_values(self, labels_table):
"""Set all the labels values."""
for lbl_list, table in zip(self._all_lbls, labels_table):
for lbl, row in zip(lbl_list, table):
lbl.switch_off()
value = self.__get_lbl_value(*row)
if self.__is_integer(value):
lbl.level = int(value)
value = _get_lbl_value(*row)
lbl.level = safe_cast(value, int)
if not isinstance(lbl, MultiColorSwitchableLabel):
value += '%'
lbl.setText(value)
lbl.switch_on()
def update_all_labels(self):
"""Update all the labels values.
If an exception is raised in the process, do nothing."""
try:
self.__get_rows()
self.__split_lists()
self.__make_labels_table()
self.__set_dates()
self.__set_labels_values()
rows = self._get_rows()
forecast, probabilities = self._split_lists()
labels_table = _make_labels_table(forecast, probabilities, rows)
self._set_dates(forecast, rows["solar_row"])
self._set_labels_values(labels_table)
except Exception:
pass
def remove_data(self):
self.forecast = ''
self.probabilities = ''
"""Remove the reference to the downloaded data."""
self.forecast = []
self.probabilities = []