2023-12-03 12:57:01 +01:00

447 lines
13 KiB
Python

import datetime
import io
import locale
import os
import time
from itertools import chain
from pathlib import Path
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
from flask import Flask, Markup, abort, render_template, request
from flask_caching import Cache
from download_digital import (
construct_dataframe,
get_bez_data,
get_landesbezirk,
landesbezirk_dict,
)
config = {
"CACHE_TYPE": "FileSystemCache",
"CACHE_DEFAULT_TIMEOUT": 300,
"CACHE_THRESHOLD": 50,
"CACHE_DIR": "cache",
}
abbrev_dict = {
"BBR": "Berlin-Brandenburg",
"BaWü": "Baden-Württemberg",
"NDS": "Niedersachsen-Bremen",
"NRW": "Nordrhein-Westfalen",
"RLP": "Rheinland-Pfalz-Saarland",
"SAT": "Sachsen, Sachsen-Anhalt, Thüringen",
}
os.environ["TZ"] = "Europe/Berlin"
time.tzset()
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)
def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame, datetime.datetime]:
bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url)
df = construct_dataframe(bez_data=bez_data[0], special_tag="stud")
df_state = construct_dataframe(bez_data=bez_data[1])
return df, df_state, datetime.datetime.now()
def create_plot_df(
curr_datetime,
current_df: pd.DataFrame | None,
data_folder: str = "data",
sheet_name: str = "digital",
) -> pd.DataFrame:
data_dict = {}
## Important: If multiple results are stored for the same date
## the last is used. So this relies on the Landesbezirk data
## to be stored with a filename that is lexigraphically larger
## than the single district results.
for f in sorted(Path(data_folder).iterdir()):
with f.open("rb") as ff:
df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0)
if "Landesbezirk" not in df.columns:
df["Landesbezirk"] = df.index.map(get_landesbezirk)
df = df.astype({"Digitale Befragung": "Int32"})
df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
key = f.name[:10]
data_dict[key] = df["Digitale Befragung"]
df = pd.DataFrame(data=data_dict).T
max_date = df.index.max()
df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10)
df = df.reindex(
pd.date_range(start="2023-08-15", end=max_date) + pd.DateOffset(hours=10)
)
if current_df is not None:
if "Landesbezirk" not in current_df.columns:
current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk)
current_df = current_df.astype({"Digitale Befragung": "Int32"})
current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
df.loc[curr_datetime] = current_df["Digitale Befragung"]
if pd.isna(df.loc[df.index.max()][0]):
df = df.drop([df.index.max()])
return df
def plot(
df: pd.DataFrame,
annotate_current: bool = False,
total_targets: tuple[int, ...] = (1500, 2500, 3500),
alpha: float | None = None,
landesbez_str: str | None = None,
fix_lims: bool = True,
max_shading_date=None,
) -> str:
fig = plt.figure(dpi=300, figsize=(8.5, 5))
target_time = pd.Timestamp("2023-10-01")
plt.axvline(x=target_time, color="tab:green", linestyle=":")
if fix_lims:
for total_target in total_targets:
plt.axhline(y=total_target, color="#48a9be", linestyle="--")
for bez in landesbez_str:
series = df.sum(axis=1) if bez is None else df[bez]
plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan)
plot_df = plot_df.astype({"Digitale Befragung": "float32"})
if not pd.isna(plot_df).all().item():
if alpha is not None:
plt.fill_between(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
color="#e4004e",
alpha=alpha,
)
(line,) = plt.plot(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
ls="--",
marker="o",
lw=1,
color="#e4004e" if bez is None or not fix_lims else None,
markersize=4,
label=bez if bez is not None else "Bundesweit",
)
if annotate_current and bez is None:
plt.annotate(
"Jetzt",
(
plot_df.dropna().index[-1],
plot_df.dropna()["Digitale Befragung"][-1] * 1.03,
),
fontsize=8,
ha="center",
)
plt.plot(
plot_df.index,
plot_df["Digitale Befragung"],
lw=1.5,
color=line.get_color(),
# label=bez,
)
plt.title("Teilnahme an Digitaler Beschäftigtenbefragung")
plt.ylabel("# Teilnahmen")
if fix_lims:
max_val = df.sum(axis=1).max().item()
nearest_target = np.array(total_targets, dtype=np.float32) - max_val
nearest_target[nearest_target <= 0] = np.inf
idx = np.argmin(nearest_target)
ceil_val = max(max_val, total_targets[idx])
plt.ylim(0, ceil_val * 1.04)
plt.legend()
# use timezone offset to center tick labels
plt.gca().xaxis.set_major_locator(
mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12")
)
plt.gca().xaxis.set_minor_locator(mdates.DayLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m."))
plt.grid(True, which="major", axis="y")
plt.grid(True, which="minor", axis="x")
plt.gca().tick_params("x", length=0, which="major")
def val_to_perc(val):
return 100 * val / total_targets[0]
def perc_to_val(perc):
return perc * total_targets[0] / 100
sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val))
sec_ax.set_ylabel("# Teilnahmen [% Erfolg]")
sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter())
xlim = plt.xlim()
# fill weekends
if max_shading_date is None:
max_shading_date = df.index.max() + datetime.timedelta(days=4)
days = pd.date_range(start="2023-08-14", end=max_shading_date)
for idx, day in enumerate(days[:-1]):
if day.weekday() >= 5:
plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray")
# reset xlim
plt.xlim((xlim[0], pd.Timestamp("2023-10-02")))
plt.tight_layout()
return fig
def create_fig(
url: str = "https://beschaeftigtenbefragung.verdi.de/",
importance_factor: float = 1.0,
landesbez_strs: list[str | None] | None = None,
fix_lims: bool = True,
):
curr_datetime = datetime.datetime.now()
try:
df, df_state, curr_datetime = get_tables(url)
df = df.sort_values(
["Digitale Befragung", "Bundesland", "Bezirk"],
ascending=[False, True, True],
)
df_state = df_state.sort_values("Landesbezirk")
plot_df = create_plot_df(curr_datetime, df_state)
annotate_current = True
timestamp = curr_datetime.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(e)
last_file = sorted(Path("data").iterdir())[-1]
key = last_file.name[:10]
with (Path("data") / f"{key}_data.ods").open("rb") as ff:
df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
with (Path("data") / f"{key}_state_data.ods").open("rb") as ff:
df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
plot_df = create_plot_df(curr_datetime, df_state)
annotate_current = False
timestamp = Markup(f'<font color="red">{key} 10:00:00</font>')
total = plot_df.loc[curr_datetime].sum()
if landesbez_strs is None:
landesbez_strs = [None] + [
bez
for bez in plot_df.columns
if plot_df.loc[curr_datetime][bez] >= importance_factor * total
]
return (
plot(
plot_df,
annotate_current=annotate_current,
landesbez_str=landesbez_strs,
fix_lims=fix_lims,
),
df,
df_state,
timestamp,
)
def convert_fig_to_svg(fig: plt.Figure) -> str:
# Convert plot to SVG image
imgdata = io.StringIO()
fig.savefig(imgdata, format="svg")
imgdata.seek(0) # rewind the data
return imgdata.read()
def _print_as_html(
df: pd.DataFrame,
output_str: list[str],
df_state: pd.DataFrame | None = None,
dropna: bool = True,
) -> list[str]:
df = df.astype({"Digitale Befragung": "Int32"})
missing_df = (
df[["Digitale Befragung"]]
.isna()
.join(df[["Landesbezirk"]])
.groupby("Landesbezirk")
.sum()
)
total = df_state["Digitale Befragung"].sum() if df_state is not None else None
if df_state is not None:
for idx, row in missing_df.loc[
missing_df["Digitale Befragung"] == 1
].iterrows():
df_tmp = df.loc[df["Landesbezirk"] == idx]
df_state_tmp = df_state.loc[df_state["Landesbezirk"] == idx]
missing_idx = df_tmp.loc[df_tmp.isna().any(axis=1)].iloc[0].name
df["Digitale Befragung"].loc[missing_idx] = (
df_state_tmp["Digitale Befragung"].sum()
- df_tmp["Digitale Befragung"].sum()
)
df = df.sort_values(
["Digitale Befragung", "Landesbezirk", "Bezirk"],
ascending=[False, True, True],
)
if dropna:
df = df.dropna()
with pd.option_context("display.max_rows", None):
table = df.to_html(
index_names=False,
justify="left",
index=False,
classes="sortable dataframe",
)
tfoot = [
" <tfoot>",
" <tr>",
" <td>Gesamt</td>",
]
for i in range(len(df.columns) - 2):
tfoot.append(" <td></td>")
tfoot.extend(
[
f" <td>{df['Digitale Befragung'].sum()}</td>",
" </tr>",
]
)
if total and (diff := total - df["Digitale Befragung"].sum()):
tfoot.append(" <tr>")
num_missing = missing_df["Digitale Befragung"].sum()
tfoot.append(
f" <td>Weitere Bezirke ({num_missing})</td>"
if num_missing
else f" <td>Weitere Bezirke</td>"
)
for i in range(len(df.columns) - 2):
tfoot.append(" <td></td>")
tfoot.extend(
[
f" <td>{diff}</td>",
" </tr>",
]
)
tfoot.append(" </tfoot>")
tfoot = "\n".join(tfoot)
idx = table.index("</table>")
output_str.append(table[: idx - 1])
output_str.append(tfoot)
output_str.append(table[idx:])
return output_str
@app.route("/<state>")
@cache.cached(query_string=True)
def state_dashboard(state: str):
if state in abbrev_dict:
state = abbrev_dict[state]
if state not in landesbezirk_dict.values():
abort(404)
importance_factor = request.args.get("importance")
if not importance_factor:
importance_factor = 1.0
else:
importance_factor = float(importance_factor)
fig, df, df_state, timestamp = create_fig(landesbez_strs=[state], fix_lims=False)
svg_string = convert_fig_to_svg(fig)
plt.close()
df["Bundesland"] = df.index.map(get_landesbezirk)
df = df.rename(columns={"Bundesland": "Landesbezirk"})
df_state = df_state.loc[df_state["Landesbezirk"] == state]
df = df.loc[df["Landesbezirk"] == state]
output_str = []
output_str = _print_as_html(df_state, output_str, dropna=False)
output_str = _print_as_html(df, output_str, df_state=df_state, dropna=False)
return render_template(
"base.html",
tables="\n".join(output_str),
timestamp=timestamp,
image=svg_string,
)
@app.route("/")
@cache.cached(query_string=True)
def dashboard():
importance_factor = request.args.get("importance")
if not importance_factor:
importance_factor = 1.0
else:
importance_factor = float(importance_factor)
fig, df, df_state, timestamp = create_fig(importance_factor=importance_factor)
svg_string = convert_fig_to_svg(fig)
plt.close()
df["Bundesland"] = df.index.map(get_landesbezirk)
df = df.rename(columns={"Bundesland": "Landesbezirk"})
output_str = []
output_str = _print_as_html(df_state, output_str, dropna=False)
output_str = _print_as_html(df, output_str, df_state)
return render_template(
"base.html",
tables="\n".join(output_str),
timestamp=timestamp,
image=svg_string,
)
@app.route("/total")
@cache.cached(timeout=60)
def total_result(url: str = "https://beschaeftigtenbefragung.verdi.de/"):
df, df_state, curr_datetime = get_tables(url)
total = df_state["Digitale Befragung"].sum().item()
return f"{total}"
if __name__ == "__main__":
app.run()