2023-08-28 17:08:50 +02:00

264 lines
7.9 KiB
Python

import datetime
import io
from itertools import chain
from pathlib import Path
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import pandas as pd
from flask import Flask, Markup, render_template, request
from flask_caching import Cache
from download_digital import construct_dataframe, get_bez_data, get_landesbezirk
config = {
"CACHE_TYPE": "FileSystemCache",
"CACHE_DEFAULT_TIMEOUT": 300,
"CACHE_THRESHOLD": 50,
"CACHE_DIR": "cache",
}
import locale
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)
def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame]:
bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url)
df = construct_dataframe(bez_data=bez_data[0], special_tag="stud")
df_state = construct_dataframe(bez_data=bez_data[1])
return df, df_state
def create_plot_df(
current_df: pd.DataFrame | None,
data_folder: str,
sheet_name: str,
curr_datetime,
) -> pd.DataFrame:
data_dict = {}
## Important: If multiple results are stored for the same date
## the last is used. So this relies on the Landesbezirk data
## to be stored with a filename that is lexigraphically larger
## than the single district results.
for f in sorted(Path(data_folder).iterdir()):
with f.open("rb") as ff:
df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0)
if "Landesbezirk" not in df.columns:
df["Landesbezirk"] = df.index.map(get_landesbezirk)
df = df.astype({"Digitale Befragung": "Int32"})
df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
key = f.name[:10]
data_dict[key] = df["Digitale Befragung"]
df = pd.DataFrame(data=data_dict).T
df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10)
df = df.reindex(
pd.date_range(start="2023-08-15", end=curr_datetime)
+ pd.DateOffset(hours=10)
)
if current_df is not None:
if "Landesbezirk" not in current_df.columns:
current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk)
current_df = current_df.astype({"Digitale Befragung": "Int32"})
current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
df.loc[curr_datetime] = current_df["Digitale Befragung"]
if pd.isna(df.loc[df.index.max()][0]):
df = df.drop([df.index.max()])
return df
def plot(
current_df: pd.DataFrame | None = None,
data_folder: str = "data",
sheet_name: str = "digital",
total_targets: tuple[int, ...] = (1500, ),
alpha: float | None = None,
landesbez_str: str | None = None
) -> str:
curr_datetime = datetime.datetime.now()
df = create_plot_df(
current_df=current_df,
data_folder=data_folder,
sheet_name=sheet_name,
curr_datetime=curr_datetime,
)
fig = plt.figure(dpi=300)
# fill weekends
max_date = curr_datetime + datetime.timedelta(days=1)
days = pd.date_range(start="2023-08-14", end=max_date)
for idx, day in enumerate(days[:-1]):
if day.weekday() >= 5:
plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray")
series = df.sum(axis=1) if landesbez_str is None else df[landesbez_str]
plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan)
plot_df = plot_df.astype({"Digitale Befragung": "float32"})
if not pd.isna(plot_df).all().item():
if alpha is not None:
plt.fill_between(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
color="#e4004e",
alpha=alpha,
)
plt.plot(
plot_df.dropna().index,
plot_df.dropna()["Digitale Befragung"],
ls="--",
marker="o",
lw=1,
color="#e4004e",
markersize=4,
label=landesbez_str,
)
if current_df is not None:
plt.annotate(
"Jetzt",
(plot_df.dropna().index[-1], plot_df.dropna()["Digitale Befragung"][-1] * 1.03),
fontsize=8,
ha="center",
)
plt.plot(plot_df.index, plot_df["Digitale Befragung"], lw=1.5, color="#e4004e", label=landesbez_str,)
plt.title("Teilnahme an Digitaler Beschäftigtenbefragung")
plt.ylabel("# Teilnahmen")
plt.ylim(0, total_targets[0] + 100)
# use timezone offset to center tick labels
plt.gca().xaxis.set_major_locator(
mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12")
)
plt.gca().xaxis.set_minor_locator(mdates.DayLocator())
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m."))
plt.grid(True, which="major", axis="y")
plt.grid(True, which="minor", axis="x")
plt.gca().tick_params("x", length=0, which="major")
def val_to_perc(val):
return 100 * val / total_targets[0]
def perc_to_val(perc):
return perc * total_targets[0] / 100
sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val))
sec_ax.set_ylabel("# Teilnahmen [% Erfolg]")
sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter())
for total_target in total_targets:
plt.axhline(y=total_target, color="#48a9be", linestyle="--")
plt.tight_layout()
# Convert plot to SVG image
imgdata = io.StringIO()
fig.savefig(imgdata, format="svg")
imgdata.seek(0) # rewind the data
return imgdata.read()
@app.route("/")
@cache.cached()
def tables(
url: str = "https://beschaeftigtenbefragung.verdi.de/",
):
def _print_as_html(df: pd.DataFrame):
df = df.astype({"Digitale Befragung": "Int32"})
with pd.option_context("display.max_rows", None):
table = df.to_html(
index_names=False,
justify="left",
index=False,
classes="sortable dataframe",
)
tfoot = [
" <tfoot>",
" <tr>",
" <td>Gesamt</td>",
]
for i in range(len(df.columns) - 2):
tfoot.append(" <td></td>")
tfoot.extend(
[
f" <td>{df['Digitale Befragung'].sum()}</td>",
" </tr>",
" </tfoot>",
]
)
tfoot = "\n".join(tfoot)
idx = table.index("</table>")
output_str.append(table[: idx - 1])
output_str.append(tfoot)
output_str.append(table[idx:])
output_str = []
try:
df, df_state = get_tables(url)
df = df.sort_values(
["Digitale Befragung", "Bundesland", "Bezirk"],
ascending=[False, True, True],
)
df_state = df_state.sort_values("Landesbezirk")
image = plot(df_state)
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
except Exception as e:
print(e)
last_file = sorted(Path("data").iterdir())[-1]
key = last_file.name[:10]
with (Path("data") / f"{key}_data.ods").open("rb") as ff:
df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
with (Path("data") / f"{key}_state_data.ods").open("rb") as ff:
df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
{"Digitale Befragung": "Int32"}
)
image = plot()
timestamp = Markup(f'<font color="red">{key} 10:00:00</font>')
_print_as_html(df_state)
_print_as_html(df)
return render_template(
"base.html",
tables="\n".join(output_str),
timestamp=timestamp,
image=image,
)
if __name__ == "__main__":
app.run()