src.datapreparation.topic_transformation
1# Imports 2import string 3from datetime import datetime 4 5import nltk.corpus 6import pandas as pd 7import numpy as np 8import json 9from itertools import chain 10import re 11from pdfminer.high_level import extract_text 12import requests as rq 13from bs4 import BeautifulSoup 14 15from nltk.corpus import stopwords as sw 16from nltk.tokenize import word_tokenize 17# Definitions 18ADDITIONAL_STOPWORDS = ["dr", "mag", "abs.", "abs", "nr", "§§", "nr.", "bundesgesetz", "bundesminister", "abgeordneter", "abgeordnete", 19 "mitglied", "mitglieder", "gemäss", "abgeordneten", "antrag", "satz", "dr.", "jedoch", "daher", 20 "wurde", "folgt", "10","angefügt","kraft", "gilt", "sinne", "fassung", "artikel", "bundesregierung", "sowie", 21 "XX", "XXI", "XXII", "XXIII", "XXIV", "XXV", "XXVI", "XXVII", "1.", "2.", "''", "``", "bgbl", "geändert", 22 "nationalrat"] 23"""Additional Stopwords to filter from fulltext of documents""" 24 25# Functions 26 27def fill_na_and_convert_columns_to_object_types(colnames: list[str], df: pd.DataFrame) -> pd.DataFrame: 28 """Replaces empty values in given columns of DataFrame with a string which represents an empty list""" 29 for col in colnames: 30 # Replace Lists with empty string with np.nan 31 df[df[col] == '[""]'] = np.nan 32 33 # Replace np.nan with empty list (formatted as string) so json.loads can be applied 34 if sum(pd.isna(df[col])) > 0: 35 print(f"Filling nan for column {col}.") 36 df[col] = df[col].fillna('[]') 37 df[col] = df[col].apply(lambda x: json.loads(x)) 38 return df 39 40 41def get_unique_fractions(df: pd.DataFrame) -> list[str]: 42 """Returns all unique fractions in dataframe""" 43 return list(set(chain.from_iterable(df["Fraktionen"]))) 44 45 46def import_and_cleanup_csv(path, filename, legislative_period) -> pd.DataFrame: 47 """Reads CSv file, cleans data by setting proper datatypes, and replacing empty fields""" 48 motions_df = pd.read_csv(filepath_or_buffer=path + filename, header=0, index_col=["ITYP", "INR"]) 49 motions_df["GP_CODE"] = legislative_period 50 motions_df = motions_df.set_index("GP_CODE", append=True) 51 motions_df = motions_df.reorder_levels([2, 0, 1]) 52 motions_df = motions_df.astype({'Art': str, 53 'Betreff': str, 54 'DOKTYP': str, 55 'DOKTYP_LANG': str, 56 'HIS_URL': str, 57 'VHG': str, 58 'VHG2': str}) 59 motions_df['Datum'] = pd.to_datetime(motions_df['Datum'], format='%d.%m.%Y') 60 columns_to_cleanup = ["Fraktionen", "THEMEN", "SW", "EUROVOC"] 61 motions_df = fill_na_and_convert_columns_to_object_types(columns_to_cleanup, motions_df) 62 return motions_df 63 64 65def generate_tsv(df, path, filename, column_name): 66 """Generates .tsv file which is readable by wordstream from motion Dataframe 67 68 Keyword arguments 69 df -- Dataframe of Motions. Must contain date column as "Datum" and fractions in "Fraktionen" 70 path -- path where to save the .tsv file 71 filename -- name of saved file 72 column_name -- name of column which should be used to generate tsv file. column must have list of str 73 """ 74 exploded_eurovoc = df[["Datum", "Fraktionen", column_name]].explode(column="Fraktionen") 75 aggregate_voc = exploded_eurovoc.groupby(["Datum", "Fraktionen"]).agg(sum) 76 topics_df = aggregate_voc.reset_index().pivot(index="Datum", columns="Fraktionen", values=column_name) 77 for col in topics_df.columns: 78 topics_df[col] = topics_df[col].apply(lambda x: [] if type(x) == float else x) 79 topics_df = topics_df.applymap(lambda x: '|'.join(x)) 80 topics_df = topics_df.fillna("") 81 topics_filename = filename 82 topics_df.to_csv(path_or_buf=path + '/' + topics_filename, sep='\t') 83 84 85def generate_fulltext_tsv(df: pd.DataFrame): 86 """Method to query fulltext from parlament.gv.at and generate .tsv file from result 87 saves them to with filename 'fulltext.tsv'""" 88 89 df["DocumentLinks"] = df["DocumentLinks"].fillna("[]") 90 df["DocumentLinks"] = df["DocumentLinks"].apply(lambda x: x.replace("'", '"')) 91 df = _add_documents_datatypes(df) 92 93 # find different documents and their types to setup rules for choosing it 94 df["document_text"] = df.DocumentLinks.apply(lambda x: choose_document_and_return_text(json.loads(x))) 95 df["document_text"] = df["document_text"].apply(lambda x: preprocess_text(x)) 96 97 generate_tsv(df, path,"fulltext.tsv", "document_text") 98 99 print("Parsing fulltext done") 100 101 102def choose_document_and_return_text(doc_links: list) -> str: 103 """Chooses document from doc_links based on type and title of document. 104 html is prefered, if there is no html and there are multiple pdfs, 105 documents which have "elektr" in title are prefered. 106 """ 107 if len(doc_links)==0: 108 # print("No Doc Found") 109 return "" 110 try: 111 if len(doc_links)==1: 112 if doc_links[0]["type"].lower() == "html": 113 return get_html_and_extract_text(doc_links[0]["link"]) 114 elif doc_links[0]["type"].lower() == "pdf": 115 return get_pdf_and_extract_text(doc_links[0]["link"]) 116 else: 117 print(f"Unknown document type: {doc_links[0]}") 118 return "" 119 120 doctypes = [doc_link["type"].lower() for doc_link in doc_links] 121 122 # Use HTML if there is one. otherwise check how many PDFs there are and find the best one 123 if "html" in doctypes: 124 link = doc_links[doctypes.index("html")]["link"] 125 return get_html_and_extract_text(link) 126 pdf_indices = [i for i, j in enumerate(doctypes) if j == 'pdf'] 127 if len(pdf_indices)==0: 128 # no PDF found 129 print(f"No PDF document found. Doctypes are: {pdf_indices}") 130 return "" 131 if len(pdf_indices)==1: 132 return get_pdf_and_extract_text(doc_links[pdf_indices[0]]["link"]) 133 134 if len(pdf_indices)>1: 135 for i in pdf_indices: 136 print(f'Doctitles where there are multiple pdfs: {i}:{doc_links[i]["title"]}') 137 doc_link = _decide_on_doc([doc_links[idx] for idx in pdf_indices]) 138 if doc_link != "": 139 return get_pdf_and_extract_text(doc_link) 140 except KeyError as e: 141 print(KeyError) 142 return "" 143 144 145def _decide_on_doc(doc_links: list) -> str: 146 for doc_link in doc_links: 147 if "elektr" in doc_link["title"]: 148 return doc_link["link"] 149 return "" 150 151 152 153def _has_doctype(doc_dict_list: dict, doctype: str) -> bool: 154 """Checks if doc_dict_list contains a document of doctype""" 155 for doc_dict in doc_dict_list: 156 if doc_dict.keys().__contains__("type"): 157 if doc_dict["type"] == doctype: 158 return True 159 return False 160 161 162def _add_documents_datatypes(df: pd.DataFrame) -> pd.DataFrame: 163 df["nDocs"] = df.DocumentLinks.apply(lambda x: len(json.loads(x))) 164 df["hasPDF"] = df.DocumentLinks.apply(lambda x: _has_doctype(json.loads(x), "PDF")) 165 df["hasHTML"] = df.DocumentLinks.apply(lambda x: _has_doctype(json.loads(x), "HTML")) 166 df = df.assign(checkThis=lambda x: (x.hasPDF.astype(int) + x.hasHTML.astype(int)) != x.nDocs) 167 print(f"There are {sum(df.checkThis)} out of {len(df.checkThis)} with multiple documents in a type") 168 return df 169 170 171 172 173def get_html_and_extract_text(relative_link: str) -> str: 174 """Downloads html document from parlament.gv.at/relative_link and returns text of it""" 175 base_link = "https://www.parlament.gv.at" 176 177 response = rq.get(base_link + relative_link) 178 html_doc = response.content 179 soup = BeautifulSoup(html_doc, features="html.parser") 180 # kill all script and style elements 181 for script in soup(["script", "style"]): 182 script.extract() # rip it out 183 184 # get text 185 text = soup.get_text() 186 return text 187 188 189def get_pdf_and_extract_text(relative_link: str) -> str: 190 """Downloads pdf document from parlament.gv.at/relative_link and returns text of it""" 191 192 base_link = "https://www.parlament.gv.at" 193 194 pdf = rq.get(base_link + relative_link) 195 196 with open('example.pdf', 'wb') as f: 197 f.write(pdf.content) 198 199 text = extract_text("example.pdf") 200 201 return text 202 203def preprocess_text(text: str) -> list[str]: 204 """Tokenizes, casefolds and removes nltk german + additional stopwords. also removes all tokens of length 1""" 205 tokenized_text = word_tokenize(text, "german") 206 207 lower_text = [word.casefold() for word in tokenized_text] 208 209 stopwords = sw.words("german") 210 211 # Adds more words to stopwordlist. 212 stopwords.extend(ADDITIONAL_STOPWORDS) 213 clean_text = [word for word in lower_text if not word in stopwords and word not in string.punctuation] 214 clean_text = [word for word in clean_text if not len(word)==1] 215 return clean_text 216 217 218# Main 219 220 221if __name__ == '__main__': 222 legislative_period = "XXVII" 223 path = "../data/" + legislative_period + "/" 224 filename = "antraege.csv" 225 226 clean_df = import_and_cleanup_csv(path, filename, legislative_period) 227 only_fractions_and_documents = clean_df.loc[:, ["Datum", "Fraktionen", "DocumentLinks"]] 228 generate_fulltext_tsv(only_fractions_and_documents) 229 230 # use this to generate topic files based on eurovoc 231 #generate_tsv(clean_df, path,"eurovoc.tsv", "EUROVOC") 232 233 # print(unique_parties)
Additional Stopwords to filter from fulltext of documents
28def fill_na_and_convert_columns_to_object_types(colnames: list[str], df: pd.DataFrame) -> pd.DataFrame: 29 """Replaces empty values in given columns of DataFrame with a string which represents an empty list""" 30 for col in colnames: 31 # Replace Lists with empty string with np.nan 32 df[df[col] == '[""]'] = np.nan 33 34 # Replace np.nan with empty list (formatted as string) so json.loads can be applied 35 if sum(pd.isna(df[col])) > 0: 36 print(f"Filling nan for column {col}.") 37 df[col] = df[col].fillna('[]') 38 df[col] = df[col].apply(lambda x: json.loads(x)) 39 return df
Replaces empty values in given columns of DataFrame with a string which represents an empty list
42def get_unique_fractions(df: pd.DataFrame) -> list[str]: 43 """Returns all unique fractions in dataframe""" 44 return list(set(chain.from_iterable(df["Fraktionen"])))
Returns all unique fractions in dataframe
47def import_and_cleanup_csv(path, filename, legislative_period) -> pd.DataFrame: 48 """Reads CSv file, cleans data by setting proper datatypes, and replacing empty fields""" 49 motions_df = pd.read_csv(filepath_or_buffer=path + filename, header=0, index_col=["ITYP", "INR"]) 50 motions_df["GP_CODE"] = legislative_period 51 motions_df = motions_df.set_index("GP_CODE", append=True) 52 motions_df = motions_df.reorder_levels([2, 0, 1]) 53 motions_df = motions_df.astype({'Art': str, 54 'Betreff': str, 55 'DOKTYP': str, 56 'DOKTYP_LANG': str, 57 'HIS_URL': str, 58 'VHG': str, 59 'VHG2': str}) 60 motions_df['Datum'] = pd.to_datetime(motions_df['Datum'], format='%d.%m.%Y') 61 columns_to_cleanup = ["Fraktionen", "THEMEN", "SW", "EUROVOC"] 62 motions_df = fill_na_and_convert_columns_to_object_types(columns_to_cleanup, motions_df) 63 return motions_df
Reads CSv file, cleans data by setting proper datatypes, and replacing empty fields
66def generate_tsv(df, path, filename, column_name): 67 """Generates .tsv file which is readable by wordstream from motion Dataframe 68 69 Keyword arguments 70 df -- Dataframe of Motions. Must contain date column as "Datum" and fractions in "Fraktionen" 71 path -- path where to save the .tsv file 72 filename -- name of saved file 73 column_name -- name of column which should be used to generate tsv file. column must have list of str 74 """ 75 exploded_eurovoc = df[["Datum", "Fraktionen", column_name]].explode(column="Fraktionen") 76 aggregate_voc = exploded_eurovoc.groupby(["Datum", "Fraktionen"]).agg(sum) 77 topics_df = aggregate_voc.reset_index().pivot(index="Datum", columns="Fraktionen", values=column_name) 78 for col in topics_df.columns: 79 topics_df[col] = topics_df[col].apply(lambda x: [] if type(x) == float else x) 80 topics_df = topics_df.applymap(lambda x: '|'.join(x)) 81 topics_df = topics_df.fillna("") 82 topics_filename = filename 83 topics_df.to_csv(path_or_buf=path + '/' + topics_filename, sep='\t')
Generates .tsv file which is readable by wordstream from motion Dataframe
Keyword arguments df -- Dataframe of Motions. Must contain date column as "Datum" and fractions in "Fraktionen" path -- path where to save the .tsv file filename -- name of saved file column_name -- name of column which should be used to generate tsv file. column must have list of str
86def generate_fulltext_tsv(df: pd.DataFrame): 87 """Method to query fulltext from parlament.gv.at and generate .tsv file from result 88 saves them to with filename 'fulltext.tsv'""" 89 90 df["DocumentLinks"] = df["DocumentLinks"].fillna("[]") 91 df["DocumentLinks"] = df["DocumentLinks"].apply(lambda x: x.replace("'", '"')) 92 df = _add_documents_datatypes(df) 93 94 # find different documents and their types to setup rules for choosing it 95 df["document_text"] = df.DocumentLinks.apply(lambda x: choose_document_and_return_text(json.loads(x))) 96 df["document_text"] = df["document_text"].apply(lambda x: preprocess_text(x)) 97 98 generate_tsv(df, path,"fulltext.tsv", "document_text") 99 100 print("Parsing fulltext done")
Method to query fulltext from parlament.gv.at and generate .tsv file from result saves them to with filename 'fulltext.tsv'
103def choose_document_and_return_text(doc_links: list) -> str: 104 """Chooses document from doc_links based on type and title of document. 105 html is prefered, if there is no html and there are multiple pdfs, 106 documents which have "elektr" in title are prefered. 107 """ 108 if len(doc_links)==0: 109 # print("No Doc Found") 110 return "" 111 try: 112 if len(doc_links)==1: 113 if doc_links[0]["type"].lower() == "html": 114 return get_html_and_extract_text(doc_links[0]["link"]) 115 elif doc_links[0]["type"].lower() == "pdf": 116 return get_pdf_and_extract_text(doc_links[0]["link"]) 117 else: 118 print(f"Unknown document type: {doc_links[0]}") 119 return "" 120 121 doctypes = [doc_link["type"].lower() for doc_link in doc_links] 122 123 # Use HTML if there is one. otherwise check how many PDFs there are and find the best one 124 if "html" in doctypes: 125 link = doc_links[doctypes.index("html")]["link"] 126 return get_html_and_extract_text(link) 127 pdf_indices = [i for i, j in enumerate(doctypes) if j == 'pdf'] 128 if len(pdf_indices)==0: 129 # no PDF found 130 print(f"No PDF document found. Doctypes are: {pdf_indices}") 131 return "" 132 if len(pdf_indices)==1: 133 return get_pdf_and_extract_text(doc_links[pdf_indices[0]]["link"]) 134 135 if len(pdf_indices)>1: 136 for i in pdf_indices: 137 print(f'Doctitles where there are multiple pdfs: {i}:{doc_links[i]["title"]}') 138 doc_link = _decide_on_doc([doc_links[idx] for idx in pdf_indices]) 139 if doc_link != "": 140 return get_pdf_and_extract_text(doc_link) 141 except KeyError as e: 142 print(KeyError) 143 return ""
Chooses document from doc_links based on type and title of document. html is prefered, if there is no html and there are multiple pdfs, documents which have "elektr" in title are prefered.
174def get_html_and_extract_text(relative_link: str) -> str: 175 """Downloads html document from parlament.gv.at/relative_link and returns text of it""" 176 base_link = "https://www.parlament.gv.at" 177 178 response = rq.get(base_link + relative_link) 179 html_doc = response.content 180 soup = BeautifulSoup(html_doc, features="html.parser") 181 # kill all script and style elements 182 for script in soup(["script", "style"]): 183 script.extract() # rip it out 184 185 # get text 186 text = soup.get_text() 187 return text
Downloads html document from parlament.gv.at/relative_link and returns text of it
190def get_pdf_and_extract_text(relative_link: str) -> str: 191 """Downloads pdf document from parlament.gv.at/relative_link and returns text of it""" 192 193 base_link = "https://www.parlament.gv.at" 194 195 pdf = rq.get(base_link + relative_link) 196 197 with open('example.pdf', 'wb') as f: 198 f.write(pdf.content) 199 200 text = extract_text("example.pdf") 201 202 return text
Downloads pdf document from parlament.gv.at/relative_link and returns text of it
204def preprocess_text(text: str) -> list[str]: 205 """Tokenizes, casefolds and removes nltk german + additional stopwords. also removes all tokens of length 1""" 206 tokenized_text = word_tokenize(text, "german") 207 208 lower_text = [word.casefold() for word in tokenized_text] 209 210 stopwords = sw.words("german") 211 212 # Adds more words to stopwordlist. 213 stopwords.extend(ADDITIONAL_STOPWORDS) 214 clean_text = [word for word in lower_text if not word in stopwords and word not in string.punctuation] 215 clean_text = [word for word in clean_text if not len(word)==1] 216 return clean_text
Tokenizes, casefolds and removes nltk german + additional stopwords. also removes all tokens of length 1