import datetime import io import locale import os import time from itertools import chain from pathlib import Path import matplotlib.dates as mdates import matplotlib.pyplot as plt import matplotlib.ticker as mtick import numpy as np import pandas as pd from flask import Flask, Markup, abort, render_template, request from flask_caching import Cache from download_digital import construct_dataframe, get_bez_data, get_landesbezirk, landesbezirk_dict config = { "CACHE_TYPE": "FileSystemCache", "CACHE_DEFAULT_TIMEOUT": 300, "CACHE_THRESHOLD": 50, "CACHE_DIR": "cache", } abbrev_dict = { "BBR": "Berlin-Brandenburg", "BaWü": "Baden-Württemberg", "NDS": "Niedersachsen-Bremen", "NRW": "Nordrhein-Westfalen", "RLP": "Rheinland-Pfalz-Saarland", "SAT": "Sachsen, Sachsen-Anhalt, Thüringen", } os.environ["TZ"] = "Europe/Berlin" time.tzset() locale.setlocale(locale.LC_ALL, "de_DE.UTF-8") app = Flask(__name__) app.config.from_mapping(config) cache = Cache(app) def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame]: bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url) df = construct_dataframe(bez_data=bez_data[0], special_tag="stud") df_state = construct_dataframe(bez_data=bez_data[1]) return df, df_state def create_plot_df( curr_datetime, current_df: pd.DataFrame | None, data_folder: str = "data", sheet_name: str = "digital", ) -> pd.DataFrame: data_dict = {} ## Important: If multiple results are stored for the same date ## the last is used. So this relies on the Landesbezirk data ## to be stored with a filename that is lexigraphically larger ## than the single district results. for f in sorted(Path(data_folder).iterdir()): with f.open("rb") as ff: df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0) if "Landesbezirk" not in df.columns: df["Landesbezirk"] = df.index.map(get_landesbezirk) df = df.astype({"Digitale Befragung": "Int32"}) df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum() key = f.name[:10] data_dict[key] = df["Digitale Befragung"] df = pd.DataFrame(data=data_dict).T df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10) df = df.reindex( pd.date_range(start="2023-08-15", end=curr_datetime) + pd.DateOffset(hours=10) ) if current_df is not None: if "Landesbezirk" not in current_df.columns: current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk) current_df = current_df.astype({"Digitale Befragung": "Int32"}) current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum() df.loc[curr_datetime] = current_df["Digitale Befragung"] if pd.isna(df.loc[df.index.max()][0]): df = df.drop([df.index.max()]) return df def plot( curr_datetime, df: pd.DataFrame, annotate_current: bool = False, total_targets: tuple[int, ...] = (1500, 2500, 3500), alpha: float | None = None, landesbez_str: str | None = None, fix_lims: bool = True, ) -> str: fig = plt.figure(dpi=300) # fill weekends max_date = curr_datetime + datetime.timedelta(days=1) days = pd.date_range(start="2023-08-14", end=max_date) for idx, day in enumerate(days[:-1]): if day.weekday() >= 5: plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray") for bez in landesbez_str: series = df.sum(axis=1) if bez is None else df[bez] plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan) plot_df = plot_df.astype({"Digitale Befragung": "float32"}) if not pd.isna(plot_df).all().item(): if alpha is not None: plt.fill_between( plot_df.dropna().index, plot_df.dropna()["Digitale Befragung"], color="#e4004e", alpha=alpha, ) (line,) = plt.plot( plot_df.dropna().index, plot_df.dropna()["Digitale Befragung"], ls="--", marker="o", lw=1, color="#e4004e" if bez is None or not fix_lims else None, markersize=4, label=bez if bez is not None else "Bundesweit", ) if annotate_current and bez is None: plt.annotate( "Jetzt", ( plot_df.dropna().index[-1], plot_df.dropna()["Digitale Befragung"][-1] * 1.03, ), fontsize=8, ha="center", ) plt.plot( plot_df.index, plot_df["Digitale Befragung"], lw=1.5, color=line.get_color(), # label=bez, ) plt.title("Teilnahme an Digitaler Beschäftigtenbefragung") plt.ylabel("# Teilnahmen") if fix_lims: max_val = df.sum(axis=1).max().item() nearest_target = np.array(total_targets, dtype=np.float32) - max_val nearest_target[nearest_target <= 0] = np.inf idx = np.argmin(nearest_target) ceil_val = max(max_val, total_targets[idx]) plt.ylim(0, ceil_val * 1.025) plt.legend() # use timezone offset to center tick labels plt.gca().xaxis.set_major_locator( mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12") ) plt.gca().xaxis.set_minor_locator(mdates.DayLocator()) plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m.")) plt.grid(True, which="major", axis="y") plt.grid(True, which="minor", axis="x") plt.gca().tick_params("x", length=0, which="major") def val_to_perc(val): return 100 * val / total_targets[0] def perc_to_val(perc): return perc * total_targets[0] / 100 sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val)) sec_ax.set_ylabel("# Teilnahmen [% Erfolg]") sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter()) if fix_lims: for total_target in total_targets: plt.axhline(y=total_target, color="#48a9be", linestyle="--") plt.tight_layout() return fig def create_fig( url: str = "https://beschaeftigtenbefragung.verdi.de/", importance_factor: float = 1.0, landesbez_strs: list[str | None] | None = None, fix_lims: bool = True, ): curr_datetime = datetime.datetime.now() try: df, df_state = get_tables(url) df = df.sort_values( ["Digitale Befragung", "Bundesland", "Bezirk"], ascending=[False, True, True], ) df_state = df_state.sort_values("Landesbezirk") plot_df = create_plot_df(curr_datetime, df_state) annotate_current = True timestamp = curr_datetime.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(e) last_file = sorted(Path("data").iterdir())[-1] key = last_file.name[:10] with (Path("data") / f"{key}_data.ods").open("rb") as ff: df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype( {"Digitale Befragung": "Int32"} ) with (Path("data") / f"{key}_state_data.ods").open("rb") as ff: df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype( {"Digitale Befragung": "Int32"} ) plot_df = create_plot_df(curr_datetime) annotate_current = False timestamp = Markup(f'{key} 10:00:00') total = plot_df.loc[curr_datetime].sum() if landesbez_strs is None: landesbez_strs = [None] + [ bez for bez in plot_df.columns if plot_df.loc[curr_datetime][bez] >= importance_factor * total ] return ( plot( curr_datetime, plot_df, annotate_current=annotate_current, landesbez_str=landesbez_strs, fix_lims=fix_lims, ), df, df_state, timestamp, ) def convert_fig_to_svg(fig: plt.Figure) -> str: # Convert plot to SVG image imgdata = io.StringIO() fig.savefig(imgdata, format="svg") imgdata.seek(0) # rewind the data return imgdata.read() def _print_as_html(df: pd.DataFrame, output_str: list[str], total: int | None = None, dropna: bool = True) -> list[str]: df = df.astype({"Digitale Befragung": "Int32"}) if dropna: df = df.dropna() with pd.option_context("display.max_rows", None): table = df.to_html( index_names=False, justify="left", index=False, classes="sortable dataframe", ) tfoot = [ " ", " ", " Gesamt", ] for i in range(len(df.columns) - 2): tfoot.append(" ") tfoot.extend( [ f" {df['Digitale Befragung'].sum()}", " ", ] ) if total and (diff := total - df['Digitale Befragung'].sum()): tfoot.extend([ " ", " Weitere Bezirke", ]) for i in range(len(df.columns) - 2): tfoot.append(" ") tfoot.extend( [ f" {diff}", " ", ] ) tfoot.append(" ") tfoot = "\n".join(tfoot) idx = table.index("") output_str.append(table[: idx - 1]) output_str.append(tfoot) output_str.append(table[idx:]) return output_str @app.route("/") @cache.cached(query_string=True) def state_dashboard(state: str): if state in abbrev_dict: state = abbrev_dict[state] if state not in landesbezirk_dict.values(): abort(404) importance_factor = request.args.get("importance") if not importance_factor: importance_factor = 1.0 else: importance_factor = float(importance_factor) fig, df, df_state, timestamp = create_fig(landesbez_strs=[state], fix_lims=False) svg_string = convert_fig_to_svg(fig) plt.close() df["Bundesland"] = df.index.map(get_landesbezirk) df = df.rename(columns={"Bundesland": "Landesbezirk"}) df_state = df_state.loc[df_state["Landesbezirk"] == state] df = df.loc[df["Landesbezirk"] == state] output_str = [] output_str = _print_as_html(df_state, output_str, dropna=False) output_str = _print_as_html(df, output_str, total=df_state['Digitale Befragung'].sum(), dropna=False) return render_template( "base.html", tables="\n".join(output_str), timestamp=timestamp, image=svg_string, ) @app.route("/") @cache.cached(query_string=True) def dashboard(): importance_factor = request.args.get("importance") if not importance_factor: importance_factor = 1.0 else: importance_factor = float(importance_factor) fig, df, df_state, timestamp = create_fig(importance_factor=importance_factor) svg_string = convert_fig_to_svg(fig) plt.close() df["Bundesland"] = df.index.map(get_landesbezirk) df = df.rename(columns={"Bundesland": "Landesbezirk"}) output_str = [] output_str = _print_as_html(df_state, output_str, dropna=False) output_str = _print_as_html(df, output_str, total=df_state['Digitale Befragung'].sum()) return render_template( "base.html", tables="\n".join(output_str), timestamp=timestamp, image=svg_string, ) if __name__ == "__main__": app.run()