413 lines
12 KiB
Python
413 lines
12 KiB
Python
import datetime
|
|
import io
|
|
import locale
|
|
import os
|
|
import time
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
|
|
import matplotlib.dates as mdates
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.ticker as mtick
|
|
import numpy as np
|
|
import pandas as pd
|
|
from flask import Flask, Markup, abort, render_template, request
|
|
from flask_caching import Cache
|
|
|
|
from download_digital import (
|
|
construct_dataframe,
|
|
get_bez_data,
|
|
get_landesbezirk,
|
|
landesbezirk_dict,
|
|
)
|
|
|
|
config = {
|
|
"CACHE_TYPE": "FileSystemCache",
|
|
"CACHE_DEFAULT_TIMEOUT": 300,
|
|
"CACHE_THRESHOLD": 50,
|
|
"CACHE_DIR": "cache",
|
|
}
|
|
|
|
abbrev_dict = {
|
|
"BBR": "Berlin-Brandenburg",
|
|
"BaWü": "Baden-Württemberg",
|
|
"NDS": "Niedersachsen-Bremen",
|
|
"NRW": "Nordrhein-Westfalen",
|
|
"RLP": "Rheinland-Pfalz-Saarland",
|
|
"SAT": "Sachsen, Sachsen-Anhalt, Thüringen",
|
|
}
|
|
|
|
os.environ["TZ"] = "Europe/Berlin"
|
|
time.tzset()
|
|
|
|
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
|
|
app = Flask(__name__)
|
|
app.config.from_mapping(config)
|
|
cache = Cache(app)
|
|
|
|
|
|
def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url)
|
|
|
|
df = construct_dataframe(bez_data=bez_data[0], special_tag="stud")
|
|
df_state = construct_dataframe(bez_data=bez_data[1])
|
|
|
|
return df, df_state
|
|
|
|
|
|
def create_plot_df(
|
|
curr_datetime,
|
|
current_df: pd.DataFrame | None,
|
|
data_folder: str = "data",
|
|
sheet_name: str = "digital",
|
|
) -> pd.DataFrame:
|
|
data_dict = {}
|
|
|
|
## Important: If multiple results are stored for the same date
|
|
## the last is used. So this relies on the Landesbezirk data
|
|
## to be stored with a filename that is lexigraphically larger
|
|
## than the single district results.
|
|
|
|
for f in sorted(Path(data_folder).iterdir()):
|
|
with f.open("rb") as ff:
|
|
df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0)
|
|
|
|
if "Landesbezirk" not in df.columns:
|
|
df["Landesbezirk"] = df.index.map(get_landesbezirk)
|
|
|
|
df = df.astype({"Digitale Befragung": "Int32"})
|
|
df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
|
|
|
|
key = f.name[:10]
|
|
data_dict[key] = df["Digitale Befragung"]
|
|
|
|
df = pd.DataFrame(data=data_dict).T
|
|
max_date = df.index.max()
|
|
|
|
df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10)
|
|
|
|
df = df.reindex(
|
|
pd.date_range(start="2023-08-15", end=max_date) + pd.DateOffset(hours=10)
|
|
)
|
|
|
|
if current_df is not None:
|
|
if "Landesbezirk" not in current_df.columns:
|
|
current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk)
|
|
current_df = current_df.astype({"Digitale Befragung": "Int32"})
|
|
current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
|
|
|
|
df.loc[curr_datetime] = current_df["Digitale Befragung"]
|
|
|
|
if pd.isna(df.loc[df.index.max()][0]):
|
|
df = df.drop([df.index.max()])
|
|
|
|
return df
|
|
|
|
|
|
def plot(
|
|
df: pd.DataFrame,
|
|
annotate_current: bool = False,
|
|
total_targets: tuple[int, ...] = (1500, 2500, 3500),
|
|
alpha: float | None = None,
|
|
landesbez_str: str | None = None,
|
|
fix_lims: bool = True,
|
|
max_shading_date=None,
|
|
) -> str:
|
|
fig = plt.figure(dpi=300)
|
|
|
|
if fix_lims:
|
|
for total_target in total_targets:
|
|
plt.axhline(y=total_target, color="#48a9be", linestyle="--")
|
|
|
|
for bez in landesbez_str:
|
|
series = df.sum(axis=1) if bez is None else df[bez]
|
|
plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan)
|
|
plot_df = plot_df.astype({"Digitale Befragung": "float32"})
|
|
if not pd.isna(plot_df).all().item():
|
|
if alpha is not None:
|
|
plt.fill_between(
|
|
plot_df.dropna().index,
|
|
plot_df.dropna()["Digitale Befragung"],
|
|
color="#e4004e",
|
|
alpha=alpha,
|
|
)
|
|
|
|
(line,) = plt.plot(
|
|
plot_df.dropna().index,
|
|
plot_df.dropna()["Digitale Befragung"],
|
|
ls="--",
|
|
marker="o",
|
|
lw=1,
|
|
color="#e4004e" if bez is None or not fix_lims else None,
|
|
markersize=4,
|
|
label=bez if bez is not None else "Bundesweit",
|
|
)
|
|
|
|
if annotate_current and bez is None:
|
|
plt.annotate(
|
|
"Jetzt",
|
|
(
|
|
plot_df.dropna().index[-1],
|
|
plot_df.dropna()["Digitale Befragung"][-1] * 1.03,
|
|
),
|
|
fontsize=8,
|
|
ha="center",
|
|
)
|
|
|
|
plt.plot(
|
|
plot_df.index,
|
|
plot_df["Digitale Befragung"],
|
|
lw=1.5,
|
|
color=line.get_color(),
|
|
# label=bez,
|
|
)
|
|
|
|
plt.title("Teilnahme an Digitaler Beschäftigtenbefragung")
|
|
plt.ylabel("# Teilnahmen")
|
|
|
|
if fix_lims:
|
|
max_val = df.sum(axis=1).max().item()
|
|
|
|
nearest_target = np.array(total_targets, dtype=np.float32) - max_val
|
|
nearest_target[nearest_target <= 0] = np.inf
|
|
idx = np.argmin(nearest_target)
|
|
|
|
ceil_val = max(max_val, total_targets[idx])
|
|
plt.ylim(0, ceil_val * 1.025)
|
|
plt.legend()
|
|
|
|
# use timezone offset to center tick labels
|
|
plt.gca().xaxis.set_major_locator(
|
|
mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12")
|
|
)
|
|
plt.gca().xaxis.set_minor_locator(mdates.DayLocator())
|
|
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m."))
|
|
|
|
plt.grid(True, which="major", axis="y")
|
|
plt.grid(True, which="minor", axis="x")
|
|
|
|
plt.gca().tick_params("x", length=0, which="major")
|
|
|
|
def val_to_perc(val):
|
|
return 100 * val / total_targets[0]
|
|
|
|
def perc_to_val(perc):
|
|
return perc * total_targets[0] / 100
|
|
|
|
sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val))
|
|
sec_ax.set_ylabel("# Teilnahmen [% Erfolg]")
|
|
sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter())
|
|
|
|
xlim = plt.xlim()
|
|
|
|
# fill weekends
|
|
if max_shading_date is None:
|
|
max_shading_date = df.index.max() + datetime.timedelta(days=3)
|
|
days = pd.date_range(start="2023-08-14", end=max_shading_date)
|
|
for idx, day in enumerate(days[:-1]):
|
|
if day.weekday() >= 5:
|
|
plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray")
|
|
|
|
# reset xlim
|
|
plt.xlim(xlim)
|
|
|
|
plt.tight_layout()
|
|
|
|
return fig
|
|
|
|
|
|
def create_fig(
|
|
url: str = "https://beschaeftigtenbefragung.verdi.de/",
|
|
importance_factor: float = 1.0,
|
|
landesbez_strs: list[str | None] | None = None,
|
|
fix_lims: bool = True,
|
|
):
|
|
curr_datetime = datetime.datetime.now()
|
|
try:
|
|
df, df_state = get_tables(url)
|
|
|
|
df = df.sort_values(
|
|
["Digitale Befragung", "Bundesland", "Bezirk"],
|
|
ascending=[False, True, True],
|
|
)
|
|
|
|
df_state = df_state.sort_values("Landesbezirk")
|
|
|
|
plot_df = create_plot_df(curr_datetime, df_state)
|
|
annotate_current = True
|
|
timestamp = curr_datetime.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
last_file = sorted(Path("data").iterdir())[-1]
|
|
key = last_file.name[:10]
|
|
|
|
with (Path("data") / f"{key}_data.ods").open("rb") as ff:
|
|
df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
|
|
{"Digitale Befragung": "Int32"}
|
|
)
|
|
with (Path("data") / f"{key}_state_data.ods").open("rb") as ff:
|
|
df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
|
|
{"Digitale Befragung": "Int32"}
|
|
)
|
|
|
|
plot_df = create_plot_df(curr_datetime)
|
|
annotate_current = False
|
|
timestamp = Markup(f'<font color="red">{key} 10:00:00</font>')
|
|
|
|
total = plot_df.loc[curr_datetime].sum()
|
|
if landesbez_strs is None:
|
|
landesbez_strs = [None] + [
|
|
bez
|
|
for bez in plot_df.columns
|
|
if plot_df.loc[curr_datetime][bez] >= importance_factor * total
|
|
]
|
|
return (
|
|
plot(
|
|
plot_df,
|
|
annotate_current=annotate_current,
|
|
landesbez_str=landesbez_strs,
|
|
fix_lims=fix_lims,
|
|
),
|
|
df,
|
|
df_state,
|
|
timestamp,
|
|
)
|
|
|
|
|
|
def convert_fig_to_svg(fig: plt.Figure) -> str:
|
|
# Convert plot to SVG image
|
|
imgdata = io.StringIO()
|
|
fig.savefig(imgdata, format="svg")
|
|
imgdata.seek(0) # rewind the data
|
|
|
|
return imgdata.read()
|
|
|
|
|
|
def _print_as_html(
|
|
df: pd.DataFrame,
|
|
output_str: list[str],
|
|
total: int | None = None,
|
|
dropna: bool = True,
|
|
) -> list[str]:
|
|
df = df.astype({"Digitale Befragung": "Int32"})
|
|
if dropna:
|
|
df = df.dropna()
|
|
with pd.option_context("display.max_rows", None):
|
|
table = df.to_html(
|
|
index_names=False,
|
|
justify="left",
|
|
index=False,
|
|
classes="sortable dataframe",
|
|
)
|
|
|
|
tfoot = [
|
|
" <tfoot>",
|
|
" <tr>",
|
|
" <td>Gesamt</td>",
|
|
]
|
|
for i in range(len(df.columns) - 2):
|
|
tfoot.append(" <td></td>")
|
|
tfoot.extend(
|
|
[
|
|
f" <td>{df['Digitale Befragung'].sum()}</td>",
|
|
" </tr>",
|
|
]
|
|
)
|
|
if total and (diff := total - df["Digitale Befragung"].sum()):
|
|
tfoot.extend(
|
|
[
|
|
" <tr>",
|
|
" <td>Weitere Bezirke</td>",
|
|
]
|
|
)
|
|
for i in range(len(df.columns) - 2):
|
|
tfoot.append(" <td></td>")
|
|
tfoot.extend(
|
|
[
|
|
f" <td>{diff}</td>",
|
|
" </tr>",
|
|
]
|
|
)
|
|
tfoot.append(" </tfoot>")
|
|
|
|
tfoot = "\n".join(tfoot)
|
|
idx = table.index("</table>")
|
|
output_str.append(table[: idx - 1])
|
|
output_str.append(tfoot)
|
|
output_str.append(table[idx:])
|
|
return output_str
|
|
|
|
|
|
@app.route("/<state>")
|
|
@cache.cached(query_string=True)
|
|
def state_dashboard(state: str):
|
|
if state in abbrev_dict:
|
|
state = abbrev_dict[state]
|
|
|
|
if state not in landesbezirk_dict.values():
|
|
abort(404)
|
|
|
|
importance_factor = request.args.get("importance")
|
|
if not importance_factor:
|
|
importance_factor = 1.0
|
|
else:
|
|
importance_factor = float(importance_factor)
|
|
|
|
fig, df, df_state, timestamp = create_fig(landesbez_strs=[state], fix_lims=False)
|
|
svg_string = convert_fig_to_svg(fig)
|
|
plt.close()
|
|
|
|
df["Bundesland"] = df.index.map(get_landesbezirk)
|
|
df = df.rename(columns={"Bundesland": "Landesbezirk"})
|
|
|
|
df_state = df_state.loc[df_state["Landesbezirk"] == state]
|
|
df = df.loc[df["Landesbezirk"] == state]
|
|
|
|
output_str = []
|
|
output_str = _print_as_html(df_state, output_str, dropna=False)
|
|
output_str = _print_as_html(
|
|
df, output_str, total=df_state["Digitale Befragung"].sum(), dropna=False
|
|
)
|
|
|
|
return render_template(
|
|
"base.html",
|
|
tables="\n".join(output_str),
|
|
timestamp=timestamp,
|
|
image=svg_string,
|
|
)
|
|
|
|
|
|
@app.route("/")
|
|
@cache.cached(query_string=True)
|
|
def dashboard():
|
|
importance_factor = request.args.get("importance")
|
|
if not importance_factor:
|
|
importance_factor = 1.0
|
|
else:
|
|
importance_factor = float(importance_factor)
|
|
|
|
fig, df, df_state, timestamp = create_fig(importance_factor=importance_factor)
|
|
svg_string = convert_fig_to_svg(fig)
|
|
plt.close()
|
|
|
|
df["Bundesland"] = df.index.map(get_landesbezirk)
|
|
df = df.rename(columns={"Bundesland": "Landesbezirk"})
|
|
|
|
output_str = []
|
|
output_str = _print_as_html(df_state, output_str, dropna=False)
|
|
output_str = _print_as_html(
|
|
df, output_str, total=df_state["Digitale Befragung"].sum()
|
|
)
|
|
|
|
return render_template(
|
|
"base.html",
|
|
tables="\n".join(output_str),
|
|
timestamp=timestamp,
|
|
image=svg_string,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|