import datetime import io import locale import os import time from itertools import chain from pathlib import Path import matplotlib.dates as mdates import matplotlib.pyplot as plt import matplotlib.ticker as mtick import numpy as np import pandas as pd from flask import Flask, Markup, abort, render_template, request from flask_caching import Cache from download_digital import ( construct_dataframe, get_bez_data, get_landesbezirk, landesbezirk_dict, ) config = { "CACHE_TYPE": "FileSystemCache", "CACHE_DEFAULT_TIMEOUT": 300, "CACHE_THRESHOLD": 50, "CACHE_DIR": "cache", } abbrev_dict = { "BBR": "Berlin-Brandenburg", "BaWü": "Baden-Württemberg", "NDS": "Niedersachsen-Bremen", "NRW": "Nordrhein-Westfalen", "RLP": "Rheinland-Pfalz-Saarland", "SAT": "Sachsen, Sachsen-Anhalt, Thüringen", } os.environ["TZ"] = "Europe/Berlin" time.tzset() locale.setlocale(locale.LC_ALL, "de_DE.UTF-8") app = Flask(__name__) app.config.from_mapping(config) cache = Cache(app) def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame, datetime.datetime]: bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url) df = construct_dataframe(bez_data=bez_data[0], special_tag="stud") df_state = construct_dataframe(bez_data=bez_data[1]) return df, df_state, datetime.datetime.now() def create_plot_df( curr_datetime, current_df: pd.DataFrame | None, data_folder: str = "data", sheet_name: str = "digital", ) -> pd.DataFrame: data_dict = {} ## Important: If multiple results are stored for the same date ## the last is used. So this relies on the Landesbezirk data ## to be stored with a filename that is lexigraphically larger ## than the single district results. for f in sorted(Path(data_folder).iterdir()): with f.open("rb") as ff: df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0) if "Landesbezirk" not in df.columns: df["Landesbezirk"] = df.index.map(get_landesbezirk) df = df.astype({"Digitale Befragung": "Int32"}) df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum() key = f.name[:10] data_dict[key] = df["Digitale Befragung"] df = pd.DataFrame(data=data_dict).T max_date = df.index.max() df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10) df = df.reindex( pd.date_range(start="2023-08-15", end=max_date) + pd.DateOffset(hours=10) ) if current_df is not None: if "Landesbezirk" not in current_df.columns: current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk) current_df = current_df.astype({"Digitale Befragung": "Int32"}) current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum() df.loc[curr_datetime] = current_df["Digitale Befragung"] if pd.isna(df.loc[df.index.max()][0]): df = df.drop([df.index.max()]) return df def plot( df: pd.DataFrame, annotate_current: bool = False, total_targets: tuple[int, ...] = (1500, 2500, 3500), alpha: float | None = None, landesbez_str: str | None = None, fix_lims: bool = True, max_shading_date=None, ) -> str: fig = plt.figure(dpi=300, figsize=(8.5, 5)) target_time = pd.Timestamp("2023-10-01") plt.axvline(x=target_time, color="tab:green", linestyle=":") if fix_lims: for total_target in total_targets: plt.axhline(y=total_target, color="#48a9be", linestyle="--") for bez in landesbez_str: series = df.sum(axis=1) if bez is None else df[bez] plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan) plot_df = plot_df.astype({"Digitale Befragung": "float32"}) if not pd.isna(plot_df).all().item(): if alpha is not None: plt.fill_between( plot_df.dropna().index, plot_df.dropna()["Digitale Befragung"], color="#e4004e", alpha=alpha, ) (line,) = plt.plot( plot_df.dropna().index, plot_df.dropna()["Digitale Befragung"], ls="--", marker="o", lw=1, color="#e4004e" if bez is None or not fix_lims else None, markersize=4, label=bez if bez is not None else "Bundesweit", ) if annotate_current and bez is None: plt.annotate( "Jetzt", ( plot_df.dropna().index[-1], plot_df.dropna()["Digitale Befragung"][-1] * 1.03, ), fontsize=8, ha="center", ) plt.plot( plot_df.index, plot_df["Digitale Befragung"], lw=1.5, color=line.get_color(), # label=bez, ) plt.title("Teilnahme an Digitaler Beschäftigtenbefragung") plt.ylabel("# Teilnahmen") if fix_lims: max_val = df.sum(axis=1).max().item() nearest_target = np.array(total_targets, dtype=np.float32) - max_val nearest_target[nearest_target <= 0] = np.inf idx = np.argmin(nearest_target) ceil_val = max(max_val, total_targets[idx]) plt.ylim(0, ceil_val * 1.04) plt.legend() # use timezone offset to center tick labels plt.gca().xaxis.set_major_locator( mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12") ) plt.gca().xaxis.set_minor_locator(mdates.DayLocator()) plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m.")) plt.grid(True, which="major", axis="y") plt.grid(True, which="minor", axis="x") plt.gca().tick_params("x", length=0, which="major") def val_to_perc(val): return 100 * val / total_targets[0] def perc_to_val(perc): return perc * total_targets[0] / 100 sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val)) sec_ax.set_ylabel("# Teilnahmen [% Erfolg]") sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter()) xlim = plt.xlim() # fill weekends if max_shading_date is None: max_shading_date = df.index.max() + datetime.timedelta(days=4) days = pd.date_range(start="2023-08-14", end=max_shading_date) for idx, day in enumerate(days[:-1]): if day.weekday() >= 5: plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray") # reset xlim plt.xlim((xlim[0], pd.Timestamp("2023-10-02"))) plt.tight_layout() return fig def create_fig( url: str = "https://beschaeftigtenbefragung.verdi.de/", importance_factor: float = 1.0, landesbez_strs: list[str | None] | None = None, fix_lims: bool = True, ): curr_datetime = datetime.datetime.now() try: df, df_state, curr_datetime = get_tables(url) df = df.sort_values( ["Digitale Befragung", "Bundesland", "Bezirk"], ascending=[False, True, True], ) df_state = df_state.sort_values("Landesbezirk") plot_df = create_plot_df(curr_datetime, df_state) annotate_current = True timestamp = curr_datetime.strftime("%Y-%m-%d %H:%M:%S") except Exception as e: print(e) last_file = sorted(Path("data").iterdir())[-1] key = last_file.name[:10] with (Path("data") / f"{key}_data.ods").open("rb") as ff: df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype( {"Digitale Befragung": "Int32"} ) with (Path("data") / f"{key}_state_data.ods").open("rb") as ff: df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype( {"Digitale Befragung": "Int32"} ) plot_df = create_plot_df(curr_datetime, df_state) annotate_current = False timestamp = Markup(f'{key} 10:00:00') total = plot_df.loc[curr_datetime].sum() if landesbez_strs is None: landesbez_strs = [None] + [ bez for bez in plot_df.columns if plot_df.loc[curr_datetime][bez] >= importance_factor * total ] return ( plot( plot_df, annotate_current=annotate_current, landesbez_str=landesbez_strs, fix_lims=fix_lims, ), df, df_state, timestamp, ) def convert_fig_to_svg(fig: plt.Figure) -> str: # Convert plot to SVG image imgdata = io.StringIO() fig.savefig(imgdata, format="svg") imgdata.seek(0) # rewind the data return imgdata.read() def _print_as_html( df: pd.DataFrame, output_str: list[str], df_state: pd.DataFrame | None = None, dropna: bool = True, ) -> list[str]: df = df.astype({"Digitale Befragung": "Int32"}) missing_df = ( df[["Digitale Befragung"]] .isna() .join(df[["Landesbezirk"]]) .groupby("Landesbezirk") .sum() ) total = df_state["Digitale Befragung"].sum() if df_state is not None else None if df_state is not None: for idx, row in missing_df.loc[ missing_df["Digitale Befragung"] == 1 ].iterrows(): df_tmp = df.loc[df["Landesbezirk"] == idx] df_state_tmp = df_state.loc[df_state["Landesbezirk"] == idx] missing_idx = df_tmp.loc[df_tmp.isna().any(axis=1)].iloc[0].name df["Digitale Befragung"].loc[missing_idx] = ( df_state_tmp["Digitale Befragung"].sum() - df_tmp["Digitale Befragung"].sum() ) df = df.sort_values( ["Digitale Befragung", "Landesbezirk", "Bezirk"], ascending=[False, True, True], ) if dropna: df = df.dropna() with pd.option_context("display.max_rows", None): table = df.to_html( index_names=False, justify="left", index=False, classes="sortable dataframe", ) tfoot = [ "
", "