334 lines
9.9 KiB
Python

import datetime
import io
import locale
import os
import time
from itertools import chain
from pathlib import Path
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
from flask import Flask, Markup, render_template, request
from flask_caching import Cache
from download_digital import construct_dataframe, get_bez_data, get_landesbezirk
config = {
"CACHE_TYPE": "FileSystemCache",
"CACHE_DEFAULT_TIMEOUT": 300,
"CACHE_THRESHOLD": 50,
"CACHE_DIR": "cache",
}
os.environ["TZ"] = "Europe/Berlin"
time.tzset()
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)
def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame]:
bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url)
df = construct_dataframe(bez_data=bez_data[0], special_tag="stud")
df_state = construct_dataframe(bez_data=bez_data[1])
return df, df_state
def create_plot_df(
curr_datetime,
current_df: pd.DataFrame | None,
data_folder: str = "data",
sheet_name: str = "digital",
) -> pd.DataFrame:
data_dict = {}
## Important: If multiple results are stored for the same date
## the last is used. So this relies on the Landesbezirk data
## to be stored with a filename that is lexigraphically larger
## than the single district results.
for f in sorted(Path(data_folder).iterdir()):
with f.open("rb") as ff:
df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0)
if "Landesbezirk" not in df.columns:
df["Landesbezirk"] = df.index.map(get_landesbezirk)
df = df.astype({"Digitale Befragung": "Int32"})
df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
key = f.name[:10]
data_dict[key] = df["Digitale Befragung"]
df = pd.DataFrame(data=data_dict).T
df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10)
df = df.reindex(
pd.date_range(start="2023-08-15", end=curr_datetime) + pd.DateOffset(hours=10)
)
if current_df is not None:
if "Landesbezirk" not in current_df.columns:
current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk)
current_df = current_df.astype({"Digitale Befragung": "Int32"})
current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
df.loc[curr_datetime] = current_df["Digitale Befragung"]
if pd.isna(df.loc[df.index.max()][0]):
df = df.drop([df.index.max()])
return df
def plot(
curr_datetime,
df: pd.DataFrame,
annotate_current: bool = False,
total_targets: tuple[int, ...] = (1500, 2500, 3500),
alpha: float | None = None,
landesbez_str: str | None = None,
) -> str:
fig = plt.figure(dpi=300)
# fill weekends
max_date = curr_datetime + datetime.timedelta(days=1)
days = pd.date_range(start="2023-08-14", end=max_date)
for idx, day in enumerate(days[:-1]):
if day.weekday() >= 5:
plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray")
for bez in landesbez_str:
series = df.sum(axis=1) if bez is None else df[bez]
plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan)
plot_df = plot_df.astype({"Digitale Befragung": "float32"})
if not pd.isna(plot_df).all().item():
if alpha is not None:
plt.fill_between(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
color="#e4004e",
alpha=alpha,
)
(line,) = plt.plot(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
ls="--",
marker="o",
lw=1,
color="#e4004e" if bez is None else None,
markersize=4,
label=bez if bez is not None else "Bundesweit",
)
if annotate_current and bez is None:
plt.annotate(
"Jetzt",
(
plot_df.dropna().index[-1],
plot_df.dropna()["Digitale Befragung"][-1] * 1.03,
),
fontsize=8,
ha="center",
)
plt.plot(
plot_df.index,
plot_df["Digitale Befragung"],
lw=1.5,
color=line.get_color(),
# label=bez,
)
plt.title("Teilnahme an Digitaler Beschäftigtenbefragung")
plt.ylabel("# Teilnahmen")
max_val = df.sum(axis=1).max().item()
nearest_target = np.array(total_targets, dtype=np.float32) - max_val
nearest_target[nearest_target <= 0] = np.inf
idx = np.argmin(nearest_target)
ceil_val = max(max_val, total_targets[idx])
plt.ylim(0, ceil_val * 1.025)
plt.legend()
# use timezone offset to center tick labels
plt.gca().xaxis.set_major_locator(
mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12")
)
plt.gca().xaxis.set_minor_locator(mdates.DayLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m."))
plt.grid(True, which="major", axis="y")
plt.grid(True, which="minor", axis="x")
plt.gca().tick_params("x", length=0, which="major")
def val_to_perc(val):
return 100 * val / total_targets[0]
def perc_to_val(perc):
return perc * total_targets[0] / 100
sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val))
sec_ax.set_ylabel("# Teilnahmen [% Erfolg]")
sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter())
for total_target in total_targets:
plt.axhline(y=total_target, color="#48a9be", linestyle="--")
plt.tight_layout()
return fig
def create_fig(
url: str = "https://beschaeftigtenbefragung.verdi.de/",
importance_factor: float = 1.0,
):
curr_datetime = datetime.datetime.now()
try:
df, df_state = get_tables(url)
df = df.sort_values(
["Digitale Befragung", "Bundesland", "Bezirk"],
ascending=[False, True, True],
)
df_state = df_state.sort_values("Landesbezirk")
plot_df = create_plot_df(curr_datetime, df_state)
annotate_current = True
timestamp = curr_datetime.strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(e)
last_file = sorted(Path("data").iterdir())[-1]
key = last_file.name[:10]
with (Path("data") / f"{key}_data.ods").open("rb") as ff:
df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
with (Path("data") / f"{key}_state_data.ods").open("rb") as ff:
df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
plot_df = create_plot_df(curr_datetime)
annotate_current = False
timestamp = Markup(f'<font color="red">{key} 10:00:00</font>')
total = plot_df.loc[curr_datetime].sum()
landesbez_strs = [None] + [
bez
for bez in plot_df.columns
if plot_df.loc[curr_datetime][bez] >= importance_factor * total
]
return (
plot(
curr_datetime,
plot_df,
annotate_current=annotate_current,
landesbez_str=landesbez_strs,
),
df,
df_state,
timestamp,
)
def convert_fig_to_svg(fig: plt.Figure) -> str:
# Convert plot to SVG image
imgdata = io.StringIO()
fig.savefig(imgdata, format="svg")
imgdata.seek(0) # rewind the data
return imgdata.read()
@app.route("/")
@cache.cached(query_string=True)
def tables():
def _print_as_html(df: pd.DataFrame, total: int | None = None) -> None:
df = df.astype({"Digitale Befragung": "Int32"})
df = df.dropna()
with pd.option_context("display.max_rows", None):
table = df.to_html(
index_names=False,
justify="left",
index=False,
classes="sortable dataframe",
)
tfoot = [
" <tfoot>",
" <tr>",
" <td>Gesamt</td>",
]
for i in range(len(df.columns) - 2):
tfoot.append(" <td></td>")
tfoot.extend(
[
f" <td>{df['Digitale Befragung'].sum()}</td>",
" </tr>",
]
)
if total:
tfoot.extend([
" <tr>",
" <td>Weitere Bezirke</td>",
])
for i in range(len(df.columns) - 2):
tfoot.append(" <td></td>")
tfoot.extend(
[
f" <td>{total - df['Digitale Befragung'].sum()}</td>",
" </tr>",
]
)
tfoot.append(" </tfoot>")
tfoot = "\n".join(tfoot)
idx = table.index("</table>")
output_str.append(table[: idx - 1])
output_str.append(tfoot)
output_str.append(table[idx:])
importance_factor = request.args.get("importance")
if not importance_factor:
importance_factor = 1.0
else:
importance_factor = float(importance_factor)
output_str = []
fig, df, df_state, timestamp = create_fig(importance_factor=importance_factor)
svg_string = convert_fig_to_svg(fig)
plt.close()
_print_as_html(df_state)
_print_as_html(df, total=df_state['Digitale Befragung'].sum())
return render_template(
"base.html",
tables="\n".join(output_str),
timestamp=timestamp,
image=svg_string,
)
if __name__ == "__main__":
app.run()