Change all double leading underscores with sigle leading underscores.

Also make minor refactoring and stylistic changes.
This commit is contained in:
alessandro90
2019-06-01 18:07:37 +02:00
parent 3dea2a0e56
commit 10724e548a
12 changed files with 387 additions and 390 deletions

2
.flake8 Normal file
View File

@@ -0,0 +1,2 @@
[flake8]
ignore = E221, E501

2
.gitignore vendored
View File

@@ -9,5 +9,5 @@ themes/.current_theme
launch.bat
designer.bat
*.sh
.vscode/
default_pics/
.vscode/

View File

@@ -582,7 +582,7 @@ class Artemis(QMainWindow, Ui_MainWindow):
self.update_now_bar.set_idle()
if status_ok:
xray_long = safe_cast(self.space_weather_data.xray[-1][7], float)
format_text = lambda letter, power: letter + f"{xray_long * 10**power:.1f}"
def format_text(letter, power): return letter + f"{xray_long * 10**power:.1f}"
if xray_long < 1e-8 and xray_long != -1.00e+05:
self.peak_flux_lbl.setText(format_text("<A", 8))
elif xray_long >= 1e-8 and xray_long < 1e-7:

View File

@@ -14,7 +14,7 @@ class AudioPlayer(QObject):
method, set_audio_player, which loads the current file and refresh_btns_colors.
Everything else is managed internally."""
__time_step = 500 # Milliseconds.
_time_step = 500 # Milliseconds.
def __init__(self, play,
pause,
@@ -25,124 +25,124 @@ class AudioPlayer(QObject):
inactive_color):
"""Initialize the player."""
super().__init__()
self.__paused = False
self.__first_call = True
self.__play = play
self.__pause = pause
self.__stop = stop
self.__volume = volume
self.__audio_progress = audio_progress
self.__audio_file = None
self.__timer = QTimer()
self.__timer.timeout.connect(self.__update_bar)
self.__play.clicked.connect(self.__play_audio)
self.__pause.clicked.connect(self.__pause_audio)
self.__stop.clicked.connect(self.__stop_audio)
self.__volume.valueChanged.connect(self.__set_volume)
self.__play.setIconSize(self.__play.size())
self.__pause.setIconSize(self.__pause.size())
self.__stop.setIconSize(self.__stop.size())
self._paused = False
self._first_call = True
self._play = play
self._pause = pause
self._stop = stop
self._volume = volume
self._audio_progress = audio_progress
self._audio_file = None
self._timer = QTimer()
self._timer.timeout.connect(self._update_bar)
self._play.clicked.connect(self._play_audio)
self._pause.clicked.connect(self._pause_audio)
self._stop.clicked.connect(self._stop_audio)
self._volume.valueChanged.connect(self._set_volume)
self._play.setIconSize(self._play.size())
self._pause.setIconSize(self._pause.size())
self._stop.setIconSize(self._stop.size())
self.refresh_btns_colors(active_color, inactive_color)
def refresh_btns_colors(self, active_color, inactive_color):
"""Repaint the buttons of the widgetd after the theme has changed."""
self.__play.setIcon(qta.icon('fa5.play-circle',
self._play.setIcon(qta.icon('fa5.play-circle',
color=active_color,
color_disabled=inactive_color))
self.__pause.setIcon(qta.icon('fa5.pause-circle',
self._pause.setIcon(qta.icon('fa5.pause-circle',
color=active_color,
color_disabled=inactive_color))
self.__stop.setIcon(qta.icon('fa5.stop-circle',
self._stop.setIcon(qta.icon('fa5.stop-circle',
color=active_color,
color_disabled=inactive_color))
@pyqtSlot()
def __set_volume(self):
def _set_volume(self):
"""Set the volume of the audio samples."""
if mixer.get_init():
mixer.music.set_volume(
self.__volume.value() / self.__volume.maximum()
self._volume.value() / self._volume.maximum()
)
def __reset_audio_widget(self):
def _reset_audio_widget(self):
"""Reset the widget. Stop all playing samples."""
if mixer.get_init():
if mixer.music.get_busy():
mixer.music.stop()
self.__timer.stop()
self._timer.stop()
mixer.quit()
self.__audio_progress.reset()
self.__enable_buttons(False, False, False)
self.__paused = False
self._audio_progress.reset()
self._enable_buttons(False, False, False)
self._paused = False
@pyqtSlot()
def __update_bar(self):
def _update_bar(self):
"""Update the progress bar."""
pos = mixer.music.get_pos()
if pos == -1:
self.__timer.stop()
self.__audio_progress.reset()
self.__enable_buttons(True, False, False)
self._timer.stop()
self._audio_progress.reset()
self._enable_buttons(True, False, False)
else:
self.__audio_progress.setValue(pos)
self._audio_progress.setValue(pos)
def __set_max_progress_bar(self):
def _set_max_progress_bar(self):
"""Set the maximum value of the progress bar."""
self.__audio_progress.setMaximum(
mixer.Sound(self.__audio_file).get_length() * 1000
self._audio_progress.setMaximum(
mixer.Sound(self._audio_file).get_length() * 1000
)
def set_audio_player(self, fname=""):
"""Set the current audio sample."""
self.__first_call = True
self.__reset_audio_widget()
self._first_call = True
self._reset_audio_widget()
full_name = os.path.join(
Constants.DATA_FOLDER,
Constants.AUDIO_FOLDER,
fname + '.ogg'
)
if os.path.exists(full_name):
self.__play.setEnabled(True)
self.__audio_file = full_name
self._play.setEnabled(True)
self._audio_file = full_name
@pyqtSlot()
def __play_audio(self):
def _play_audio(self):
"""Play the audio sample."""
if not self.__paused:
if self.__first_call:
self.__first_call = False
if not self._paused:
if self._first_call:
self._first_call = False
mixer.init(frequency=AudioSegment.from_ogg(
self.__audio_file
self._audio_file
).frame_rate,
buffer=2048)
mixer.music.load(self.__audio_file)
self.__set_volume()
self.__set_max_progress_bar()
mixer.music.load(self._audio_file)
self._set_volume()
self._set_max_progress_bar()
mixer.music.play()
else:
mixer.music.unpause()
self.__paused = False
self.__timer.start(self.__time_step)
self.__enable_buttons(False, True, True)
self._paused = False
self._timer.start(self._time_step)
self._enable_buttons(False, True, True)
@pyqtSlot()
def __stop_audio(self):
def _stop_audio(self):
"""Stop the audio sample."""
mixer.music.stop()
self.__audio_progress.reset()
self.__timer.stop()
self.__enable_buttons(True, False, False)
self._audio_progress.reset()
self._timer.stop()
self._enable_buttons(True, False, False)
@pyqtSlot()
def __pause_audio(self):
def _pause_audio(self):
"""Pause the audio sample."""
mixer.music.pause()
self.__timer.stop()
self.__paused = True
self.__enable_buttons(True, False, False)
self._timer.stop()
self._paused = True
self._enable_buttons(True, False, False)
def __enable_buttons(self, play_en, pause_en, stop_en):
def _enable_buttons(self, play_en, pause_en, stop_en):
"""Set the three buttons status."""
self.__play.setEnabled(play_en)
self.__pause.setEnabled(pause_en)
self.__stop.setEnabled(stop_en)
self._play.setEnabled(play_en)
self._pause.setEnabled(pause_en)
self._stop.setEnabled(stop_en)

View File

@@ -10,21 +10,21 @@ class ClickableProgressBar(QProgressBar):
def __init__(self, parent=None):
"""Initialize the instance."""
self.__text = ''
self._text = ''
super().__init__(parent)
def text(self):
"""Return the text displayed on the bar."""
return self.__text
return self._text
def set_idle(self):
"""Set the bar to a non-downloading status."""
self.__text = Constants.CLICK_TO_UPDATE_STR
self._text = Constants.CLICK_TO_UPDATE_STR
self.setMaximum(self.minimum() + 1)
def set_updating(self):
"""Set the bar to a downloading status."""
self.__text = Constants.UPDATING_STR
self._text = Constants.UPDATING_STR
self.setMaximum(self.minimum())
def mousePressEvent(self, event):

View File

@@ -100,6 +100,9 @@ class ForecastColors:
KP5_COLOR = "#BEE3FE"
_Band = namedtuple("Band", ["lower", "upper"])
class Constants:
"""Container class for several constants of the software."""
@@ -138,19 +141,18 @@ class Constants:
LABEL_ON_COLOR = "on"
LABEL_OFF_COLOR = "off"
TEXT_COLOR = "text"
__Band = namedtuple("Band", ["lower", "upper"])
__ELF = __Band(0, 30) # Formally it is (3, 30) Hz.
__SLF = __Band(30, 300)
__ULF = __Band(300, 3000)
__VLF = __Band(3000, 30000)
__LF = __Band(30 * 10**3, 300 * 10**3)
__MF = __Band(300 * 10 ** 3, 3000 * 10**3)
__HF = __Band(3 * 10**6, 30 * 10**6)
__VHF = __Band(30 * 10**6, 300 * 10**6)
__UHF = __Band(300 * 10**6, 3000 * 10**6)
__SHF = __Band(3 * 10**9, 30 * 10**9)
__EHF = __Band(30 * 10**9, 300 * 10**9)
BANDS = (__ELF, __SLF, __ULF, __VLF, __LF, __MF, __HF, __VHF, __UHF, __SHF, __EHF)
_ELF = _Band(0, 30) # Formally it is (3, 30) Hz.
_SLF = _Band(30, 300)
_ULF = _Band(300, 3000)
_VLF = _Band(3000, 30000)
_LF = _Band(30 * 10**3, 300 * 10**3)
_MF = _Band(300 * 10 ** 3, 3000 * 10**3)
_HF = _Band(3 * 10**6, 30 * 10**6)
_VHF = _Band(30 * 10**6, 300 * 10**6)
_UHF = _Band(300 * 10**6, 3000 * 10**6)
_SHF = _Band(3 * 10**9, 30 * 10**9)
_EHF = _Band(30 * 10**9, 300 * 10**9)
BANDS = (_ELF, _SLF, _ULF, _VLF, _LF, _MF, _HF, _VHF, _UHF, _SHF, _EHF)
MAX_DIGITS = 3
RANGE_SEPARATOR = ' ÷ '
GFD_SITE = "http://qrg.globaltuners.com/"

View File

@@ -10,12 +10,12 @@ class DoubleTextButton(QPushButton):
def __init__(self, parent=None):
"""Extends QPushButton.__init__."""
super().__init__(parent)
self.clicked.connect(self.__manage_click)
self.clicked.connect(self._manage_click)
def set_texts(self, text_a, text_b):
"""Set the two texts to be displayed."""
self.__text_a = text_a
self.__text_b = text_b
self._text_a = text_a
self._text_b = text_b
def set_slave_filters(self, simple_ones=None,
radio_1=None,
@@ -30,25 +30,25 @@ class DoubleTextButton(QPushButton):
ruled_by_radio_1 -- a list of widgets whose status depend upon radio_1.
radio_2 -- a radio button.
ruled_by_radio_2 -- a list of widgets whose status depend upon radio_2."""
self.__simple_ones = simple_ones
self.__ruled_by_radio_1 = ruled_by_radio_1
self.__radio_1 = radio_1
self.__ruled_by_radio_2 = ruled_by_radio_2
self.__radio_2 = radio_2
self._simple_ones = simple_ones
self._ruled_by_radio_1 = ruled_by_radio_1
self._radio_1 = radio_1
self._ruled_by_radio_2 = ruled_by_radio_2
self._radio_2 = radio_2
@pyqtSlot()
def __manage_click(self):
def _manage_click(self):
"""Set the status of all the 'slave widgets' based on the status of the instance."""
if self.isChecked():
self.setText(self.__text_b)
self.setText(self._text_b)
enable = False
else:
self.setText(self.__text_a)
self.setText(self._text_a)
enable = True
for f in self.__simple_ones:
for f in self._simple_ones:
f.setEnabled(enable)
radio_btns = self.__radio_1, self.__radio_2
ruled_widgets = self.__ruled_by_radio_1, self.__ruled_by_radio_2
radio_btns = self._radio_1, self._radio_2
ruled_widgets = self._ruled_by_radio_1, self._ruled_by_radio_2
for radio_btn, ruled_by in zip(radio_btns, ruled_widgets):
if ruled_by:
for f in ruled_by:

View File

@@ -28,64 +28,64 @@ class DownloadWindow(QWidget, Ui_Download_window):
# Qt.WindowStaysOnTopHint
)
self.__no_internet_msg = pop_up(self, title=Messages.NO_CONNECTION,
self._no_internet_msg = pop_up(self, title=Messages.NO_CONNECTION,
text=Messages.NO_CONNECTION_MSG,
connection=self.close)
self.__bad_db_download_msg = pop_up(self, title=Messages.BAD_DOWNLOAD,
self._bad_db_download_msg = pop_up(self, title=Messages.BAD_DOWNLOAD,
text=Messages.BAD_DOWNLOAD_MSG,
connection=self.close)
self.__download_thread = DownloadThread()
self.__download_thread.finished.connect(self.__wait_close)
self.__download_thread.progress.connect(self.__display_progress)
self.cancel_btn.clicked.connect(self.__terminate_process)
self._download_thread = DownloadThread()
self._download_thread.finished.connect(self._wait_close)
self._download_thread.progress.connect(self._display_progress)
self.cancel_btn.clicked.connect(self._terminate_process)
def start_download(self):
"""Start the download thread."""
self.__download_thread.start()
self._download_thread.start()
def __downlaod_format_str(self, n, speed):
def _downlaod_format_str(self, n, speed):
"""Return a well-formatted string with downloaded MB and speed."""
return f"Downloaded MB: {n}\nSpeed: {speed} MB/s"
def show(self):
"""Extends QWidget.show. Set downloaded MB and speed to zero."""
self.status_lbl.setText(self.__downlaod_format_str(0, 0))
self.status_lbl.setText(self._downlaod_format_str(0, 0))
super().show()
@pyqtSlot(int, float)
def __display_progress(self, progress, speed):
def _display_progress(self, progress, speed):
"""Display the downloaded MB and speed."""
if progress != Constants.EXTRACTING_CODE:
self.status_lbl.setText(self.__downlaod_format_str(progress, speed))
self.status_lbl.setText(self._downlaod_format_str(progress, speed))
elif progress == Constants.EXTRACTING_CODE:
self.status_lbl.setText(Constants.EXTRACTING_MSG + '\n')
@pyqtSlot()
def __terminate_process(self):
def _terminate_process(self):
"""Terminate the download thread and close."""
if self.__download_thread.isRunning():
self.__download_thread.terminate()
self.__download_thread.wait()
if self._download_thread.isRunning():
self._download_thread.terminate()
self._download_thread.wait()
self.close()
@pyqtSlot()
def __wait_close(self):
def _wait_close(self):
"""Decide the action based on the download thread status and close."""
if self.__download_thread.status is ThreadStatus.OK:
if self._download_thread.status is ThreadStatus.OK:
self.complete.emit()
self.close()
elif self.__download_thread.status is ThreadStatus.NO_CONNECTION_ERR:
self.__no_internet_msg.show()
elif self.__download_thread.status is ThreadStatus.BAD_DOWNLOAD_ERR:
self.__bad_db_download_msg.show()
elif self._download_thread.status is ThreadStatus.NO_CONNECTION_ERR:
self._no_internet_msg.show()
elif self._download_thread.status is ThreadStatus.BAD_DOWNLOAD_ERR:
self._bad_db_download_msg.show()
else:
self.close()
def reject(self):
"""Extends QWidget.reject. Terminate the download thread."""
if self.__download_thread.isRunning():
self.__download_thread.terminate()
self.__download_thread.wait()
if self._download_thread.isRunning():
self._download_thread.terminate()
self._download_thread.wait()
super().reject()

View File

@@ -35,16 +35,16 @@ class SwitchableLabel(_BaseSwitchableLabel):
Apply the active state colors."""
super().switch_on()
self.__apply_colors(*self.switch_on_colors)
self._apply_colors(*self.switch_on_colors)
def switch_off(self):
"""Extend _BaseSwitchableLabel.switch_off.
Apply the inactive state colors."""
super().switch_off()
self.__apply_colors(*self.switch_off_colors)
self._apply_colors(*self.switch_off_colors)
def __apply_colors(self, start, end):
def _apply_colors(self, start, end):
"""Set text and background color of the label."""
self.setStyleSheet(
f"""

View File

@@ -66,12 +66,13 @@ class _ColorsHandler:
else:
self.is_simple_string = True
self.color_str = color_str
self.is_valid = self.__color_is_valid()
self.is_valid = self._color_is_valid()
def __color_is_valid(self):
def _color_is_valid(self):
"""Return if the color (or the list of colors) has a valid html format."""
pattern = "#([a-zA-Z0-9]){6}"
match_ok = lambda col: bool(re.match(pattern, col)) and len(col) == 7
def match_ok(col): return bool(re.match(pattern, col)) and len(col) == 7
if not self.is_simple_string:
if len(self.color_list) <= self.MAX_COLORS:
return all(match_ok(c) for c in self.color_list)
@@ -103,96 +104,97 @@ class _ColorsHandler:
double_color_list.append(color)
if simple_color_list or double_color_list:
return cls(simple_color_list, double_color_list)
return None
class ThemeManager:
"""Manage all the operations releted to the themes."""
def __init__(self, parent):
def __init__(self, owner):
"""Initialize the ThemeManager instance."""
self.__parent = parent
self.__parent.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self.__parent.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
self._owner = owner
self._owner.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self._owner.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
self.__theme_path = ""
self.__current_theme = ""
self._theme_path = ""
self._current_theme = ""
self.__space_weather_labels = SwitchableLabelsIterable(
self._space_weather_labels = SwitchableLabelsIterable(
*list(
chain(
self.__parent.switchable_r_labels,
self.__parent.switchable_s_labels,
self.__parent.switchable_g_now_labels,
self.__parent.switchable_g_today_labels,
self.__parent.k_storm_labels,
self.__parent.a_storm_labels,
[self.__parent.expected_noise_lbl]
self._owner.switchable_r_labels,
self._owner.switchable_s_labels,
self._owner.switchable_g_now_labels,
self._owner.switchable_g_today_labels,
self._owner.k_storm_labels,
self._owner.a_storm_labels,
[self._owner.expected_noise_lbl]
)
)
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
ThemeConstants.DEFAULT_ON_COLORS
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors", ThemeConstants.DEFAULT_OFF_COLORS
)
self.__theme_names = {}
self._theme_names = {}
def __refresh_range_labels(self):
def _refresh_range_labels(self):
"""Refresh the range-labels."""
self.__parent.set_acf_interval_label()
self.__parent.set_band_filter_label(
self.__parent.activate_low_band_filter_btn,
self.__parent.lower_band_spinbox,
self.__parent.lower_band_filter_unit,
self.__parent.lower_band_confidence,
self.__parent.activate_up_band_filter_btn,
self.__parent.upper_band_spinbox,
self.__parent.upper_band_filter_unit,
self.__parent.upper_band_confidence,
self.__parent.band_range_lbl
self._owner.set_acf_interval_label()
self._owner.set_band_filter_label(
self._owner.activate_low_band_filter_btn,
self._owner.lower_band_spinbox,
self._owner.lower_band_filter_unit,
self._owner.lower_band_confidence,
self._owner.activate_up_band_filter_btn,
self._owner.upper_band_spinbox,
self._owner.upper_band_filter_unit,
self._owner.upper_band_confidence,
self._owner.band_range_lbl
)
self.__parent.set_band_filter_label(
self.__parent.activate_low_freq_filter_btn,
self.__parent.lower_freq_spinbox,
self.__parent.lower_freq_filter_unit,
self.__parent.lower_freq_confidence,
self.__parent.activate_up_freq_filter_btn,
self.__parent.upper_freq_spinbox,
self.__parent.upper_freq_filter_unit,
self.__parent.upper_freq_confidence,
self.__parent.freq_range_lbl
self._owner.set_band_filter_label(
self._owner.activate_low_freq_filter_btn,
self._owner.lower_freq_spinbox,
self._owner.lower_freq_filter_unit,
self._owner.lower_freq_confidence,
self._owner.activate_up_freq_filter_btn,
self._owner.upper_freq_spinbox,
self._owner.upper_freq_filter_unit,
self._owner.upper_freq_confidence,
self._owner.freq_range_lbl
)
@pyqtSlot()
def __apply(self, theme_path):
def _apply(self, theme_path):
"""Apply the selected theme.
Refresh all relevant widgets.
Display a QMessageBox if the theme is not found."""
self.__theme_path = theme_path
self._theme_path = theme_path
if os.path.exists(theme_path):
if self.__theme_path != self.__current_theme:
self.__change()
self.__parent.display_specs(
item=self.__parent.signals_list.currentItem(),
if self._theme_path != self._current_theme:
self._change()
self._owner.display_specs(
item=self._owner.signals_list.currentItem(),
previous_item=None
)
self.__refresh_range_labels()
self.__parent.audio_widget.refresh_btns_colors(
self.__parent.active_color,
self.__parent.inactive_color
self._refresh_range_labels()
self._owner.audio_widget.refresh_btns_colors(
self._owner.active_color,
self._owner.inactive_color
)
self.__space_weather_labels.refresh()
self._space_weather_labels.refresh()
else:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
def __pretty_name(self, bad_name):
def _pretty_name(self, bad_name):
"""Return a well-formatted theme name."""
return ' '.join(
map(lambda s: s.capitalize(),
@@ -200,13 +202,13 @@ class ThemeManager:
)
)
def __detect_themes(self):
def _detect_themes(self):
"""Detect all available themes.
Connect all the actions to change the theme.
Display a QMessageBox if the theme folder is not found."""
themes = []
ag = QActionGroup(self.__parent, exclusive=True)
ag = QActionGroup(self._owner, exclusive=True)
if os.path.exists(ThemeConstants.FOLDER):
for theme_folder in sorted(os.listdir(ThemeConstants.FOLDER)):
relative_folder = os.path.join(ThemeConstants.FOLDER, theme_folder)
@@ -214,37 +216,37 @@ class ThemeManager:
relative_folder = os.path.join(ThemeConstants.FOLDER, theme_folder)
themes.append(relative_folder)
for theme_path in themes:
theme_name = '&' + self.__pretty_name(os.path.basename(theme_path))
theme_name = '&' + self._pretty_name(os.path.basename(theme_path))
new_theme = ag.addAction(
QAction(
theme_name,
self.__parent, checkable=True
self._owner, checkable=True
)
)
self.__parent.menu_themes.addAction(new_theme)
self.__theme_names[theme_name.lstrip('&')] = new_theme
new_theme.triggered.connect(partial(self.__apply, theme_path))
self._owner.menu_themes.addAction(new_theme)
self._theme_names[theme_name.lstrip('&')] = new_theme
new_theme.triggered.connect(partial(self._apply, theme_path))
else:
pop_up(self.__parent, title=ThemeConstants.THEME_FOLDER_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_FOLDER_NOT_FOUND,
text=ThemeConstants.MISSING_THEME_FOLDER).show()
def __change(self):
def _change(self):
"""Change the current theme.
Apply the stylesheet and set active and inactive colors.
Set all the new images needed.
Save the new current theme on file."""
theme_name = os.path.basename(self.__theme_path) + ThemeConstants.EXTENSION
theme_name = os.path.basename(self._theme_path) + ThemeConstants.EXTENSION
try:
with open(os.path.join(self.__theme_path, theme_name), "r") as stylesheet:
with open(os.path.join(self._theme_path, theme_name), "r") as stylesheet:
style = stylesheet.read()
self.__parent.setStyleSheet(style)
self.__parent.download_window.setStyleSheet(style)
self._owner.setStyleSheet(style)
self._owner.download_window.setStyleSheet(style)
except FileNotFoundError:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
icons_path = os.path.join(self.__theme_path, ThemeConstants.ICONS_FOLDER)
icons_path = os.path.join(self._theme_path, ThemeConstants.ICONS_FOLDER)
path_to_search_label = os.path.join(icons_path,Constants.SEARCH_LABEL_IMG)
@@ -253,13 +255,13 @@ class ThemeManager:
else:
path = ThemeConstants.DEFAULT_SEARCH_LABEL_PATH
self.__parent.search_label.setPixmap(QPixmap(path))
self.__parent.modulation_search_label.setPixmap(QPixmap(path))
self.__parent.location_search_label.setPixmap(QPixmap(path))
self._owner.search_label.setPixmap(QPixmap(path))
self._owner.modulation_search_label.setPixmap(QPixmap(path))
self._owner.location_search_label.setPixmap(QPixmap(path))
self.__parent.search_label.setScaledContents(True)
self.__parent.modulation_search_label.setScaledContents(True)
self.__parent.location_search_label.setScaledContents(True)
self._owner.search_label.setScaledContents(True)
self._owner.modulation_search_label.setScaledContents(True)
self._owner.location_search_label.setScaledContents(True)
path_to_volume_label = os.path.join(icons_path,Constants.VOLUME_LABEL_IMG)
@@ -268,10 +270,10 @@ class ThemeManager:
else:
path = ThemeConstants.DEFAULT_VOLUME_LABEL_PATH
self.__parent.volume_label.setPixmap(QPixmap(path))
self.__parent.volume_label.setScaledContents(True)
self._owner.volume_label.setPixmap(QPixmap(path))
self._owner.volume_label.setScaledContents(True)
path_to_colors = os.path.join(self.__theme_path,ThemeConstants.COLORS)
path_to_colors = os.path.join(self._theme_path,ThemeConstants.COLORS)
active_color_ok = False
inactive_color_ok = False
@@ -286,79 +288,79 @@ class ThemeManager:
if color_handler is not None:
for color in color_handler.simple_color_list:
if color.quality == Constants.ACTIVE:
self.__parent.active_color = color.color_str
self._owner.active_color = color.color_str
active_color_ok = True
if color.quality == Constants.INACTIVE:
self.__parent.inactive_color = color.color_str
self._owner.inactive_color = color.color_str
inactive_color_ok = True
if color.quality == Constants.TEXT_COLOR:
text_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"text_color",
color.color_str
)
for color in color_handler.double_color_list:
if color.quality == Constants.LABEL_ON_COLOR:
switch_on_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
color.color_list
)
if color.quality == Constants.LABEL_OFF_COLOR:
switch_off_color_ok = True
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors",
color.color_list
)
if not (active_color_ok and inactive_color_ok):
self.__parent.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self.__parent.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
self._owner.active_color = ThemeConstants.DEFAULT_ACTIVE_COLOR
self._owner.inactive_color = ThemeConstants.DEFAULT_INACTIVE_COLOR
if not (switch_on_color_ok and switch_off_color_ok):
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_on_colors",
ThemeConstants.DEFAULT_ON_COLORS
)
self.__space_weather_labels.set(
self._space_weather_labels.set(
"switch_off_colors",
ThemeConstants.DEFAULT_OFF_COLORS
)
if not text_color_ok:
self.__space_weather_labels.set(
self._space_weather_labels.set(
"text_color",
ThemeConstants.DEFAULT_TEXT_COLOR
)
self.__current_theme = self.__theme_path
self._current_theme = self._theme_path
try:
with open(ThemeConstants.CURRENT_THEME_FILE, "w") as current_theme:
current_theme.write(self.__theme_path)
current_theme.write(self._theme_path)
except Exception:
pass
def start(self):
"""Start the theme manager."""
self.__detect_themes()
self._detect_themes()
if os.path.exists(ThemeConstants.CURRENT_THEME_FILE):
with open(ThemeConstants.CURRENT_THEME_FILE, "r") as current_theme_path:
theme_path = current_theme_path.read()
theme_name = self.__pretty_name(os.path.basename(theme_path))
theme_name = self._pretty_name(os.path.basename(theme_path))
try:
self.__theme_names[theme_name].setChecked(True)
self._theme_names[theme_name].setChecked(True)
except Exception:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
self.__apply(theme_path)
self._apply(theme_path)
else:
try:
self.__theme_names[
self.__pretty_name(ThemeConstants.DEFAULT)
self._theme_names[
self._pretty_name(ThemeConstants.DEFAULT)
].setChecked(True)
except Exception:
pop_up(self.__parent, title=ThemeConstants.THEME_NOT_FOUND,
pop_up(self._owner, title=ThemeConstants.THEME_NOT_FOUND,
text=ThemeConstants.MISSING_THEME).show()
else:
self.__apply(ThemeConstants.DEFAULT_THEME_PATH)
self._apply(ThemeConstants.DEFAULT_THEME_PATH)

View File

@@ -47,7 +47,7 @@ class DownloadThread(BaseDownloadThread):
"""Just call super().__init__."""
super().__init__()
def __pretty_len(self, byte_obj):
def _pretty_len(self, byte_obj):
"""Return a well-formatted number of downloaded MB."""
mega = len(byte_obj) / self.CHUNK
if mega.is_integer():
@@ -55,11 +55,10 @@ class DownloadThread(BaseDownloadThread):
else:
return ceil(mega)
def __get_download_speed(self, data, delta):
def _get_download_speed(self, data, delta):
"""Return the download speed in MB/s."""
return round(
(len(data) / self.CHUNK) / delta,
2
(len(data) / self.CHUNK) / delta, 2
)
def run(self):
@@ -83,8 +82,8 @@ class DownloadThread(BaseDownloadThread):
break
raw_data += data
self.progress.emit(
self.__pretty_len(raw_data),
self.__get_download_speed(data, delta)
self._pretty_len(raw_data),
self._get_download_speed(data, delta)
)
db.release_conn()
except Exception: # No internet connection.
@@ -127,34 +126,34 @@ class _AsyncDownloader:
class UpdateSpaceWeatherThread(BaseDownloadThread, _AsyncDownloader):
"""Subclass BaseDownloadThread. Downlaod the space weather data."""
__properties = ("xray", "prot_el", "ak_index", "sgas", "geo_storm")
_properties = ("xray", "prot_el", "ak_index", "sgas", "geo_storm")
def __init__(self, space_weather_data):
"""Initialize the a local space_weather_data."""
super().__init__()
self.__space_weather_data = space_weather_data
self._space_weather_data = space_weather_data
async def __download_property(self, session, property_name):
async def _download_property(self, session, property_name):
"""Download the data conteining the information of a specific property."""
link = getattr(Constants, "SPACE_WEATHER_" + property_name.upper())
data = await self._download_resource(session, link)
setattr(self.__space_weather_data, property_name, str(data, 'utf-8'))
setattr(self._space_weather_data, property_name, str(data, 'utf-8'))
async def __download_image(self, session, n):
async def _download_image(self, session, n):
"""Download the data corresponding the n-th image displayed in the screen."""
im = await self._download_resource(
session, Constants.SPACE_WEATHER_IMGS[n]
)
self.__space_weather_data.images[n].loadFromData(im)
self._space_weather_data.images[n].loadFromData(im)
async def _download_resources(self):
"""Download all the data."""
session = aiohttp.ClientSession()
try:
t = []
for p in self.__properties:
for p in self._properties:
t.append(
asyncio.create_task(self.__download_property(session, p))
asyncio.create_task(self._download_property(session, p))
)
tot_images = range(len(Constants.SPACE_WEATHER_IMGS))
@@ -162,7 +161,7 @@ class UpdateSpaceWeatherThread(BaseDownloadThread, _AsyncDownloader):
for im_number in tot_images:
t1.append(
asyncio.create_task(
self.__download_image(session, im_number)
self._download_image(session, im_number)
)
)
await asyncio.gather(*t, *t1)
@@ -192,7 +191,7 @@ class UpdateForecastThread(BaseDownloadThread, _AsyncDownloader):
super().__init__()
self.owner = owner
async def __download_property(self, session, link, prop_name):
async def _download_property(self, session, link, prop_name):
"""Download the data from 'link' and set the corresponding property of the owner."""
resp = await self._download_resource(session, link)
resp = str(resp, 'utf-8')
@@ -207,14 +206,14 @@ class UpdateForecastThread(BaseDownloadThread, _AsyncDownloader):
try:
await asyncio.gather(
asyncio.create_task(
self.__download_property(
self._download_property(
session,
Constants.SPACE_WEATHER_GEO_STORM,
self._PropertyName.FORECAST
)
),
asyncio.create_task(
self.__download_property(
self._download_property(
session,
Constants.FORECAST_PROBABILITIES,
self._PropertyName.PROBABILITIES

View File

@@ -113,6 +113,95 @@ class SpaceWeatherData(_BaseWeatherData):
]
def _make_labels_table(forecast, probabilities, rows):
"""Organize all the arguments to feed _get_lbl_value."""
def get_first_split(x): return x.split("/")[0]
def get_second_split(x): return x.split("/")[1]
def get_third_split(x): return x.split("/")[2]
solar_row = rows["solar_row"]
event_row = rows["event_row"]
rb_now_row = rows["rb_now_row"]
ga_now_row = rows["ga_now_row"]
kp_index_row = rows["kp_index_row"]
return [
[
[forecast, solar_row, 3, None],
[probabilities, event_row + 1, 2, get_first_split],
[probabilities, event_row + 2, 2, get_first_split],
[probabilities, event_row + 3, 1, get_first_split],
[forecast, rb_now_row, 1, None],
[forecast, rb_now_row + 1, 3, None],
[probabilities, ga_now_row + 2, 1, get_first_split],
[probabilities, ga_now_row + 3, 2, get_first_split],
[probabilities, ga_now_row + 4, 2, get_first_split],
[probabilities, ga_now_row + 6, 1, get_first_split],
[probabilities, ga_now_row + 7, 2, get_first_split],
[probabilities, ga_now_row + 8, 2, get_first_split],
[forecast, kp_index_row + 3, 1, None],
[forecast, kp_index_row + 4, 1, None],
[forecast, kp_index_row + 5, 1, None],
[forecast, kp_index_row + 6, 1, None],
[forecast, kp_index_row + 7, 1, None],
[forecast, kp_index_row + 8, 1, None],
[forecast, kp_index_row + 9, 1, None],
[forecast, kp_index_row + 10, 1, None]
],
[
[forecast, solar_row, 4, None],
[probabilities, event_row + 1, 2, get_second_split],
[probabilities, event_row + 2, 2, get_second_split],
[probabilities, event_row + 3, 1, get_second_split],
[forecast, rb_now_row, 2, None],
[forecast, rb_now_row + 1, 4, None],
[probabilities, ga_now_row + 2, 1, get_second_split],
[probabilities, ga_now_row + 3, 2, get_second_split],
[probabilities, ga_now_row + 4, 2, get_second_split],
[probabilities, ga_now_row + 6, 1, get_second_split],
[probabilities, ga_now_row + 7, 2, get_second_split],
[probabilities, ga_now_row + 8, 2, get_second_split],
[forecast, kp_index_row + 3, 2, None],
[forecast, kp_index_row + 4, 2, None],
[forecast, kp_index_row + 5, 2, None],
[forecast, kp_index_row + 6, 2, None],
[forecast, kp_index_row + 7, 2, None],
[forecast, kp_index_row + 8, 2, None],
[forecast, kp_index_row + 9, 2, None],
[forecast, kp_index_row + 10, 2, None]
],
[
[forecast, solar_row, 5, None],
[probabilities, event_row + 1, 2, get_third_split],
[probabilities, event_row + 2, 2, get_third_split],
[probabilities, event_row + 3, 1, get_third_split],
[forecast, rb_now_row, 3, None],
[forecast, rb_now_row + 1, 5, None],
[probabilities, ga_now_row + 2, 1, get_third_split],
[probabilities, ga_now_row + 3, 2, get_third_split],
[probabilities, ga_now_row + 4, 2, get_third_split],
[probabilities, ga_now_row + 6, 1, get_third_split],
[probabilities, ga_now_row + 7, 2, get_third_split],
[probabilities, ga_now_row + 8, 2, get_third_split],
[forecast, kp_index_row + 3, 3, None],
[forecast, kp_index_row + 4, 3, None],
[forecast, kp_index_row + 5, 3, None],
[forecast, kp_index_row + 6, 3, None],
[forecast, kp_index_row + 7, 3, None],
[forecast, kp_index_row + 8, 3, None],
[forecast, kp_index_row + 9, 3, None],
[forecast, kp_index_row + 10, 3, None]
]
]
def _get_lbl_value(data, row, col, f=None):
"""Return the well-formatted string-value of the label."""
val = data[row][col]
if f is not None:
val = f(val)
val = val.rstrip('%')
if len(val) > 1:
val = val.lstrip('0')
return val
class ForecastData(_BaseWeatherData):
"""3-day forecast class. Extends _BaseWeatherData."""
@@ -124,49 +213,39 @@ class ForecastData(_BaseWeatherData):
"kp_index_row": "NOAA Kp index breakdown"
}
def __init__(self, parent):
LABELS_PER_COLUMN = 20
def __init__(self, owner):
"""Initialize all attributes and connect the thread to _parse_and_emit_signal."""
super().__init__()
self.forecast = ''
self.probabilities = ''
self.__labels_table = []
self.__solar_row = None
self.__event_row = None
self.__rb_now_row = None
self.__ga_now_row = None
self.__kp_index_row = None
self.forecast = []
self.probabilities = []
self._update_thread = UpdateForecastThread(self)
self._update_thread.finished.connect(self._parse_and_emit_signal)
# Cannot use '__' here because of the for loop below.
self._today_lbl = parent.today_lbl
self._today_p1_lbl = parent.today_p1_lbl
self._today_p2_lbl = parent.today_p2_lbl
self.__today_lbls = []
self.__today_p1_lbls = []
self.__today_p2_lbls = []
self.__all_lbls = []
self._today_lbl = owner.today_lbl
self._today_p1_lbl = owner.today_p1_lbl
self._today_p2_lbl = owner.today_p2_lbl
today_lbls = []
today_p1_lbls = []
today_p2_lbls = []
flags = ['', 'p1_', 'p2_']
for flag in flags:
title_lbl = getattr(self, "_today_" + flag + "lbl")
title_lbl.setText("-")
for index in range(20):
for index in range(self.LABELS_PER_COLUMN):
label = getattr(
parent,
owner,
"forecast_today_" + flag + str(index) + "_lbl"
)
label.setText(Constants.UNKNOWN)
if flag == flags[0]:
self.__today_lbls.append(label)
today_lbls.append(label)
if flag == flags[1]:
self.__today_p1_lbls.append(label)
today_p1_lbls.append(label)
if flag == flags[2]:
self.__today_p2_lbls.append(label)
today_p2_lbls.append(label)
self.__all_lbls = [
self.__today_lbls,
self.__today_p1_lbls,
self.__today_p2_lbls
]
self._all_lbls = [today_lbls, today_p1_lbls, today_p2_lbls]
def _parse_data(self):
"""Override _BaseWeatherData._parse_data.
@@ -182,152 +261,65 @@ class ForecastData(_BaseWeatherData):
)
self.probabilities = self.probabilities.splitlines()
def __split_lists(self):
def _split_lists(self):
"""Split the elements of forecast and probabilities."""
self.forecast = [i.split() for i in self.forecast]
self.probabilities = [i.split() for i in self.probabilities]
return [i.split() for i in self.forecast], [i.split() for i in self.probabilities]
def __find_row_with(self, data, text):
def _find_row_with(self, data, text):
"""Given a list of strings, return the index of the first string containing the target text."""
for i, row in enumerate(data):
if text in row:
return i
return None
def __get_rows(self):
"""Set all the rows needed for updating the screen.
def _get_rows(self):
"""Get all the rows needed for updating the screen.
Raise an exception if something goes wrong."""
self.__solar_row = self.__find_row_with(
rows = {}
rows["solar_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["solar_row"]
)
self.__event_row = self.__find_row_with(
rows["event_row"] = self._find_row_with(
self.probabilities,
self.ROW_KEYWORDS["event_row"]
)
self.__rb_now_row = self.__find_row_with(
rows["rb_now_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["rb_now_row"]
)
self.__ga_now_row = self.__find_row_with(
rows["ga_now_row"] = self._find_row_with(
self.probabilities,
self.ROW_KEYWORDS["ga_now_row"]
)
self.__kp_index_row = self.__find_row_with(
rows["kp_index_row"] = self._find_row_with(
self.forecast,
self.ROW_KEYWORDS["kp_index_row"]
)
if any([
self.__solar_row is None,
self.__event_row is None,
self.__rb_now_row is None,
self.__ga_now_row is None,
self.__kp_index_row is None
]):
if any(row is None for row in rows.values()):
raise Exception('Missing Rows')
else:
return rows
def __set_dates(self):
def _set_dates(self, forecast, solar_row):
"""Set the date labels."""
month = self.forecast[self.__solar_row - 1][0]
today = self.forecast[self.__solar_row - 1][1]
today_p1 = self.forecast[self.__solar_row - 1][3]
today_p2 = self.forecast[self.__solar_row - 1][5]
month = forecast[solar_row - 1][0]
today = forecast[solar_row - 1][1]
today_p1 = forecast[solar_row - 1][3]
today_p2 = forecast[solar_row - 1][5]
self._today_lbl.setText(month + ' ' + today)
self._today_p1_lbl.setText(month + ' ' + today_p1)
self._today_p2_lbl.setText(month + ' ' + today_p2)
def __make_labels_table(self):
"""Organize all the arguments to feed __get_lbl_value."""
get_first_split = lambda x: x.split("/")[0]
get_second_split = lambda x: x.split("/")[1]
get_third_split = lambda x: x.split("/")[2]
self.__labels_table = [
[
[self.forecast, self.__solar_row, 3, None],
[self.probabilities, self.__event_row + 1, 2, get_first_split],
[self.probabilities, self.__event_row + 2, 2, get_first_split],
[self.probabilities, self.__event_row + 3, 1, get_first_split],
[self.forecast, self.__rb_now_row, 1, None],
[self.forecast, self.__rb_now_row + 1, 3, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_first_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_first_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_first_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_first_split],
[self.forecast, self.__kp_index_row + 3, 1, None],
[self.forecast, self.__kp_index_row + 4, 1, None],
[self.forecast, self.__kp_index_row + 5, 1, None],
[self.forecast, self.__kp_index_row + 6, 1, None],
[self.forecast, self.__kp_index_row + 7, 1, None],
[self.forecast, self.__kp_index_row + 8, 1, None],
[self.forecast, self.__kp_index_row + 9, 1, None],
[self.forecast, self.__kp_index_row + 10, 1, None]
],
[
[self.forecast, self.__solar_row, 4, None],
[self.probabilities, self.__event_row + 1, 2, get_second_split],
[self.probabilities, self.__event_row + 2, 2, get_second_split],
[self.probabilities, self.__event_row + 3, 1, get_second_split],
[self.forecast, self.__rb_now_row, 2, None],
[self.forecast, self.__rb_now_row + 1, 4, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_second_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_second_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_second_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_second_split],
[self.forecast, self.__kp_index_row + 3, 2, None],
[self.forecast, self.__kp_index_row + 4, 2, None],
[self.forecast, self.__kp_index_row + 5, 2, None],
[self.forecast, self.__kp_index_row + 6, 2, None],
[self.forecast, self.__kp_index_row + 7, 2, None],
[self.forecast, self.__kp_index_row + 8, 2, None],
[self.forecast, self.__kp_index_row + 9, 2, None],
[self.forecast, self.__kp_index_row + 10, 2, None]
],
[
[self.forecast, self.__solar_row, 5, None],
[self.probabilities, self.__event_row + 1, 2, get_third_split],
[self.probabilities, self.__event_row + 2, 2, get_third_split],
[self.probabilities, self.__event_row + 3, 1, get_third_split],
[self.forecast, self.__rb_now_row, 3, None],
[self.forecast, self.__rb_now_row + 1, 5, None],
[self.probabilities, self.__ga_now_row + 2, 1, get_third_split],
[self.probabilities, self.__ga_now_row + 3, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 4, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 6, 1, get_third_split],
[self.probabilities, self.__ga_now_row + 7, 2, get_third_split],
[self.probabilities, self.__ga_now_row + 8, 2, get_third_split],
[self.forecast, self.__kp_index_row + 3, 3, None],
[self.forecast, self.__kp_index_row + 4, 3, None],
[self.forecast, self.__kp_index_row + 5, 3, None],
[self.forecast, self.__kp_index_row + 6, 3, None],
[self.forecast, self.__kp_index_row + 7, 3, None],
[self.forecast, self.__kp_index_row + 8, 3, None],
[self.forecast, self.__kp_index_row + 9, 3, None],
[self.forecast, self.__kp_index_row + 10, 3, None]
]
]
def __get_lbl_value(self, data, row, col, f=None):
"""Return the well-formatted string-value of the label."""
val = data[row][col]
if f is not None:
val = f(val)
val = val.rstrip('%')
if len(val) > 1:
val = val.lstrip('0')
return val
def __set_labels_values(self):
def _set_labels_values(self, labels_table):
"""Set all the labels values."""
for lbl_list, table in zip(self.__all_lbls, self.__labels_table):
for lbl_list, table in zip(self._all_lbls, labels_table):
for lbl, row in zip(lbl_list, table):
lbl.switch_off()
value = self.__get_lbl_value(*row)
value = _get_lbl_value(*row)
lbl.level = safe_cast(value, int)
if not isinstance(lbl, MultiColorSwitchableLabel):
value += '%'
@@ -339,15 +331,15 @@ class ForecastData(_BaseWeatherData):
If an exception is raised in the process, do nothing."""
try:
self.__get_rows()
self.__split_lists()
self.__make_labels_table()
self.__set_dates()
self.__set_labels_values()
rows = self._get_rows()
forecast, probabilities = self._split_lists()
labels_table = _make_labels_table(forecast, probabilities, rows)
self._set_dates(forecast, rows["solar_row"])
self._set_labels_values(labels_table)
except Exception:
pass
def remove_data(self):
"""Remove the reference to the downloaded data."""
self.forecast = ''
self.probabilities = ''
self.forecast = []
self.probabilities = []