296 lines
8.8 KiB
Python
296 lines
8.8 KiB
Python
import datetime
|
|
import io
|
|
import locale
|
|
import os
|
|
import time
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
|
|
import matplotlib.dates as mdates
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.ticker as mtick
|
|
import numpy as np
|
|
import pandas as pd
|
|
from flask import Flask, Markup, render_template, request
|
|
from flask_caching import Cache
|
|
|
|
from download_digital import construct_dataframe, get_bez_data, get_landesbezirk
|
|
|
|
config = {
|
|
"CACHE_TYPE": "FileSystemCache",
|
|
"CACHE_DEFAULT_TIMEOUT": 300,
|
|
"CACHE_THRESHOLD": 50,
|
|
"CACHE_DIR": "cache",
|
|
}
|
|
|
|
os.environ["TZ"] = "Europe/Berlin"
|
|
time.tzset()
|
|
|
|
locale.setlocale(locale.LC_ALL, "de_DE.UTF-8")
|
|
app = Flask(__name__)
|
|
app.config.from_mapping(config)
|
|
cache = Cache(app)
|
|
|
|
|
|
def get_tables(url: str) -> tuple[pd.DataFrame, pd.DataFrame]:
|
|
bez_data = get_bez_data(["bez_data_0", "bez_data_2"], url)
|
|
|
|
df = construct_dataframe(bez_data=bez_data[0], special_tag="stud")
|
|
df_state = construct_dataframe(bez_data=bez_data[1])
|
|
|
|
return df, df_state
|
|
|
|
|
|
def create_plot_df(
|
|
curr_datetime,
|
|
current_df: pd.DataFrame | None,
|
|
data_folder: str = "data",
|
|
sheet_name: str = "digital",
|
|
) -> pd.DataFrame:
|
|
data_dict = {}
|
|
|
|
## Important: If multiple results are stored for the same date
|
|
## the last is used. So this relies on the Landesbezirk data
|
|
## to be stored with a filename that is lexigraphically larger
|
|
## than the single district results.
|
|
|
|
for f in sorted(Path(data_folder).iterdir()):
|
|
with f.open("rb") as ff:
|
|
df = pd.read_excel(ff, sheet_name=sheet_name, index_col=0)
|
|
|
|
if "Landesbezirk" not in df.columns:
|
|
df["Landesbezirk"] = df.index.map(get_landesbezirk)
|
|
|
|
df = df.astype({"Digitale Befragung": "Int32"})
|
|
df = df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
|
|
|
|
key = f.name[:10]
|
|
data_dict[key] = df["Digitale Befragung"]
|
|
|
|
df = pd.DataFrame(data=data_dict).T
|
|
|
|
df.index = df.index.astype("datetime64[ns]") + pd.DateOffset(hours=10)
|
|
|
|
df = df.reindex(
|
|
pd.date_range(start="2023-08-15", end=curr_datetime) + pd.DateOffset(hours=10)
|
|
)
|
|
|
|
if current_df is not None:
|
|
if "Landesbezirk" not in current_df.columns:
|
|
current_df["Landesbezirk"] = current_df.index.map(get_landesbezirk)
|
|
current_df = current_df.astype({"Digitale Befragung": "Int32"})
|
|
current_df = current_df.groupby("Landesbezirk")[["Digitale Befragung"]].sum()
|
|
|
|
df.loc[curr_datetime] = current_df["Digitale Befragung"]
|
|
|
|
if pd.isna(df.loc[df.index.max()][0]):
|
|
df = df.drop([df.index.max()])
|
|
|
|
return df
|
|
|
|
|
|
def plot(
|
|
curr_datetime,
|
|
df: pd.DataFrame,
|
|
annotate_current: bool = False,
|
|
total_targets: tuple[int, ...] = (1500,),
|
|
alpha: float | None = None,
|
|
landesbez_str: str | None = None,
|
|
) -> str:
|
|
fig = plt.figure(dpi=300)
|
|
|
|
# fill weekends
|
|
max_date = curr_datetime + datetime.timedelta(days=1)
|
|
days = pd.date_range(start="2023-08-14", end=max_date)
|
|
for idx, day in enumerate(days[:-1]):
|
|
if day.weekday() >= 5:
|
|
plt.gca().axvspan(days[idx], days[idx + 1], alpha=0.2, color="gray")
|
|
|
|
for bez in landesbez_str:
|
|
series = df.sum(axis=1) if bez is None else df[bez]
|
|
plot_df = series.to_frame("Digitale Befragung").replace(0, np.nan)
|
|
plot_df = plot_df.astype({"Digitale Befragung": "float32"})
|
|
if not pd.isna(plot_df).all().item():
|
|
if alpha is not None:
|
|
plt.fill_between(
|
|
plot_df.dropna().index,
|
|
plot_df.dropna()["Digitale Befragung"],
|
|
color="#e4004e",
|
|
alpha=alpha,
|
|
)
|
|
|
|
(line,) = plt.plot(
|
|
plot_df.dropna().index,
|
|
plot_df.dropna()["Digitale Befragung"],
|
|
ls="--",
|
|
marker="o",
|
|
lw=1,
|
|
color="#e4004e" if bez is None else None,
|
|
markersize=4,
|
|
label=bez if bez is not None else "Bundesweit",
|
|
)
|
|
|
|
if annotate_current and bez is None:
|
|
plt.annotate(
|
|
"Jetzt",
|
|
(
|
|
plot_df.dropna().index[-1],
|
|
plot_df.dropna()["Digitale Befragung"][-1] * 1.03,
|
|
),
|
|
fontsize=8,
|
|
ha="center",
|
|
)
|
|
|
|
plt.plot(
|
|
plot_df.index,
|
|
plot_df["Digitale Befragung"],
|
|
lw=1.5,
|
|
color=line.get_color(),
|
|
# label=bez,
|
|
)
|
|
|
|
plt.title("Teilnahme an Digitaler Beschäftigtenbefragung")
|
|
plt.ylabel("# Teilnahmen")
|
|
plt.ylim(0, total_targets[0] + 100)
|
|
plt.legend()
|
|
|
|
# use timezone offset to center tick labels
|
|
plt.gca().xaxis.set_major_locator(
|
|
mdates.WeekdayLocator([mdates.TU], tz="Etc/GMT+12")
|
|
)
|
|
plt.gca().xaxis.set_minor_locator(mdates.DayLocator())
|
|
plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%a %d.%m."))
|
|
|
|
plt.grid(True, which="major", axis="y")
|
|
plt.grid(True, which="minor", axis="x")
|
|
|
|
plt.gca().tick_params("x", length=0, which="major")
|
|
|
|
def val_to_perc(val):
|
|
return 100 * val / total_targets[0]
|
|
|
|
def perc_to_val(perc):
|
|
return perc * total_targets[0] / 100
|
|
|
|
sec_ax = plt.gca().secondary_yaxis("right", functions=(val_to_perc, perc_to_val))
|
|
sec_ax.set_ylabel("# Teilnahmen [% Erfolg]")
|
|
sec_ax.yaxis.set_major_formatter(mtick.PercentFormatter())
|
|
|
|
for total_target in total_targets:
|
|
plt.axhline(y=total_target, color="#48a9be", linestyle="--")
|
|
|
|
plt.tight_layout()
|
|
|
|
return fig
|
|
|
|
|
|
def convert_fig_to_svg(fig: plt.Figure) -> str:
|
|
# Convert plot to SVG image
|
|
imgdata = io.StringIO()
|
|
fig.savefig(imgdata, format="svg")
|
|
imgdata.seek(0) # rewind the data
|
|
|
|
return imgdata.read()
|
|
|
|
|
|
@app.route("/")
|
|
@cache.cached()
|
|
def tables(
|
|
url: str = "https://beschaeftigtenbefragung.verdi.de/",
|
|
importance_factor: float = 1,
|
|
):
|
|
def _print_as_html(df: pd.DataFrame):
|
|
df = df.astype({"Digitale Befragung": "Int32"})
|
|
with pd.option_context("display.max_rows", None):
|
|
table = df.to_html(
|
|
index_names=False,
|
|
justify="left",
|
|
index=False,
|
|
classes="sortable dataframe",
|
|
)
|
|
|
|
tfoot = [
|
|
" <tfoot>",
|
|
" <tr>",
|
|
" <td>Gesamt</td>",
|
|
]
|
|
for i in range(len(df.columns) - 2):
|
|
tfoot.append(" <td></td>")
|
|
tfoot.extend(
|
|
[
|
|
f" <td>{df['Digitale Befragung'].sum()}</td>",
|
|
" </tr>",
|
|
" </tfoot>",
|
|
]
|
|
)
|
|
tfoot = "\n".join(tfoot)
|
|
idx = table.index("</table>")
|
|
output_str.append(table[: idx - 1])
|
|
output_str.append(tfoot)
|
|
output_str.append(table[idx:])
|
|
|
|
output_str = []
|
|
|
|
curr_datetime = datetime.datetime.now()
|
|
|
|
try:
|
|
df, df_state = get_tables(url)
|
|
|
|
df = df.sort_values(
|
|
["Digitale Befragung", "Bundesland", "Bezirk"],
|
|
ascending=[False, True, True],
|
|
)
|
|
|
|
df_state = df_state.sort_values("Landesbezirk")
|
|
|
|
plot_df = create_plot_df(curr_datetime, df_state)
|
|
annotate_current = True
|
|
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
last_file = sorted(Path("data").iterdir())[-1]
|
|
key = last_file.name[:10]
|
|
|
|
with (Path("data") / f"{key}_data.ods").open("rb") as ff:
|
|
df = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
|
|
{"Digitale Befragung": "Int32"}
|
|
)
|
|
with (Path("data") / f"{key}_state_data.ods").open("rb") as ff:
|
|
df_state = pd.read_excel(ff, sheet_name="digital", index_col=0).astype(
|
|
{"Digitale Befragung": "Int32"}
|
|
)
|
|
|
|
plot_df = create_plot_df(curr_datetime)
|
|
annotate_current = False
|
|
timestamp = Markup(f'<font color="red">{key} 10:00:00</font>')
|
|
|
|
total = plot_df.loc[curr_datetime].sum()
|
|
landesbez_strs = [None] + [
|
|
bez
|
|
for bez in plot_df.columns
|
|
if plot_df.loc[curr_datetime][bez] >= importance_factor * total
|
|
]
|
|
fig = plot(
|
|
curr_datetime,
|
|
plot_df,
|
|
annotate_current=annotate_current,
|
|
landesbez_str=landesbez_strs,
|
|
)
|
|
svg_string = convert_fig_to_svg(fig)
|
|
plt.close()
|
|
|
|
_print_as_html(df_state)
|
|
_print_as_html(df)
|
|
|
|
return render_template(
|
|
"base.html",
|
|
tables="\n".join(output_str),
|
|
timestamp=timestamp,
|
|
image=svg_string,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run()
|