Skip to content
Snippets Groups Projects
Commit 3279b869 authored by Bulk, Leonieke van den's avatar Bulk, Leonieke van den
Browse files

Fixed tiny error

parent c4877fb4
Branches
No related tags found
No related merge requests found
%% Cell type:code id: tags:
``` python
"""Tools for WFSR.
By: Lennert van Overbeeke
"""
```
%% Cell type:code id: tags:
``` python
# imports
# custom imports
import wfsr
# basics
import os
import logging
from time import sleep
from datetime import datetime, timedelta
# web
import html
import signal
import unicodedata
import googletrans
from selenium import webdriver
# data
import json
import hashlib
import openpyxl
import pandas as pd
from multiprocessing import Pool
from xlrd import XLRDError
from pkg_resources import resource_filename
from openpyxl.utils import get_column_letter
# geo
import pycountry
import shapefile
from shapely.geometry import Point # Point class
from shapely.geometry import shape # shape() is a function to convert geo objects through the interface
# shapely is installed by pip from a .whl file
# make sure to include it in the package data
```
%% Output
importing Jupyter notebook from C:\Users\overb015\.conda\envs\general\lib\site-packages\wfsr\data.ipynb
importing Jupyter notebook from C:\Users\overb015\.conda\envs\general\lib\site-packages\wfsr\tools.ipynb
importing Jupyter notebook from C:\Users\overb015\.conda\envs\general\lib\site-packages\wfsr\elastic.ipynb
%% Cell type:code id: tags:
``` python
# Time
def parallel(function, iterable, threads=5):
"""Execute a function with arguments from an iterable.
Parallellize the execution with n=5 number of threads."""
assert isinstance(threads, int)
with Pool(threads) as p:
return(p.map(function, iterable))
class Scheduler():
def __init__(self, hours=1, minutes=0):
self.hours = hours
self.minutes = minutes
def get_target(self):
now = datetime.now()
target = now.replace(
hour = (now.hour + self.hours) % 24,
minute = (now.minute + self.minutes) % 60,
second = 0,
microsecond = 0
)
return target
def wait(self):
target = self.get_target()
while target > datetime.now():
sleep(1)
def start(self, function, *args):
while True:
self.wait()
function(*args)
```
%% Cell type:code id: tags:
``` python
# Web scraping tools
def get_phantomjs_driver():
"""Use PhantomJS to load JavaScript-generated pages."""
linux_driver = resource_filename('wfsr', 'datafiles/phantomjs')
windows_driver = resource_filename('wfsr', 'datafiles/phantomjs.exe')
try:
driver = webdriver.PhantomJS(linux_driver)
except OSError:
driver = webdriver.PhantomJS(windows_driver)
return driver
def get_chrome_driver():
"""WORK IN PROGRESS"""
linux_driver = resource_filename('wfsr', 'datafiles/chromedriver')
windows_driver = resource_filename('wfsr', 'datafiles/chromedriver.exe')
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
try:
driver = webdriver.Chrome(linux_driver, chrome_options=chrome_options)
except OSError:
driver = webdriver.Chrome(windows_driver, chrome_options=chrome_options)
driver.implicitly_wait(10)
return driver
def get_html(url):
"""Fetch page HTML for a given url"""
driver = get_phantomjs_driver()
# fetch the website contents, which takes a few seconds
driver.get(url)
# get pure HTML
raw_html = driver.page_source
driver.service.process.send_signal(signal.SIGTERM) # kill the specific phantomjs child proc
driver.quit()
return raw_html
```
%% Cell type:code id: tags:
``` python
def get_md5(s):
return hashlib.md5(s.encode()).hexdigest()
```
%% Cell type:code id: tags:
``` python
# normalize text
def normalize_text(text):
# Compose modified chars into one char
text = unicodedata.normalize('NFKC', text)
# Translate HTML escaped chars back to chars
text = html.unescape(text)
# repeat to fix nested html chars
text = html.unescape(text)
text = html.unescape(text)
return text
```
%% Cell type:code id: tags:
``` python
# Reference data
data_file = resource_filename('wfsr', 'datafiles/data.xlsx')
# The country table requires some modification
_countries_converters = {
'alpha2': str,
'eurostat_alpha2': str,
'alpha3': str,
'numeric': str,
}
# Load country table
_countries = pd.read_excel(data_file, sheet_name='countries', converters=_countries_converters, keep_default_na=False).fillna('')
_countries = pd.read_excel(data_file, sheet_name='countries', converters=_countries_converters, keep_default_na=False, engine='openpyxl').fillna('')
_countries['synonyms'] = _countries['synonyms'].str.split('|')
_countries['country_caseless'] = _countries['country'].apply(lambda x: normalize_text(x).casefold() )
```
%% Cell type:code id: tags:
``` python
_shape_file = resource_filename('wfsr', 'datafiles/ne_50m_admin_0_countries')
_shape_reader = shapefile.Reader(_shape_file)
_shape_keys = [ x[0] for x in _shape_reader.fields ]
_shape_countries = []
for country in _shape_reader.shapeRecords():
rec = {k: v for k, v in zip(_shape_keys, country.record)}
shp = shape(country.shape)
iso = rec['ISO_A3']
if iso != '-99':
_shape_countries.append((iso, rec, shp))
def find_country_from_coordinates(lat, lon):
try:
latitude = float(lat)
longitude = float(lon)
except TypeError:
print("""Please make sure lat and lon are numeric.
lat: {str(lat)}
lon: {str(lon)}""")
p = Point(longitude, latitude) # the order matters
for iso, rec, shp in _shape_countries:
if shp.contains(p):
return find_country(iso)
return find_country('99')
def test_find_country_from_coordinates():
assert find_country_from_coordinates(52, 5).alpha3 == 'NLD'
```
%% Cell type:code id: tags:
``` python
# Country finder
def find_country(string, verbose=True):
try:
# make sure input is a string, all caps
string = normalize_text(str(string).strip()).casefold()
# Make sure there is enough data for a positive identification
assert len(string) > 1
except (AttributeError, AssertionError) as e:
# if this fails, return the country equivalent of None
return find_country('99')
try:
# use a library to catch the most common cases
hit = pycountry.countries.lookup(string)
string = hit.alpha_2.casefold()
except LookupError:
pass
for idx, row in _countries.iterrows():
for value in row.values:
value = normalize_text(str(value).strip()).casefold()
if string == value:
return row
if string in row['synonyms']:
return row
# print failures unless verbose=False
if verbose:
print(string)
return find_country('99')
def bulk_find_country(l, field='alpha2', verbose=False):
"""Translate a long list/pd.Series of country names.
All unique values are converted once to a dictionary,
then the list/pd.Series is translated using the
dictionary and returned.
"""
assert isinstance(l, (list, pd.Series))
assert field in _countries.columns
if isinstance(l, list):
unique = set(l)
conversion_dict = { u: find_country(u, verbose=verbose) for u in unique }
converted = [ conversion_dict.get(country) for country in l ]
get_field = [ getattr(country, field) for country in converted ]
return get_field
elif isinstance(l, pd.Series):
unique = l.unique()
conversion_dict = { u: find_country(u, verbose=verbose) for u in unique }
converted = [ conversion_dict.get(country) for country in l ]
get_field = [ getattr(country, field) for country in converted ]
return pd.Series(get_field, index=l.index)
```
%% Cell type:code id: tags:
``` python
# logging
def get_logger(name):
"""Return a logger object. Use logger.info() to log lines"""
logging.basicConfig(level=logging.DEBUG)
# create a file handler
logfile = os.getcwd() + '\\' + name + '.log'
print('Logging to file {}'.format(logfile))
handler = logging.FileHandler(logfile)
handler.setLevel(logging.DEBUG)
# create a logging format and assign
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)-5s - %(message)s')
handler.setFormatter(formatter)
# create a logger and assign handler
logger = logging.getLogger(name)
logger.addHandler(handler)
return logger
```
%% Cell type:code id: tags:
``` python
class Excel:
"""Return an Excel file with filter and sort enabled.
Logs edits to an info sheet when it is updated
and helps add sheets without overwriting."""
def __init__(self, filePath, info=None, verbose=True):
self.f = filePath
self.get_book()
self.verbose=verbose
def get_book(self):
"""Return existing or create new at filePath location."""
try:
book = openpyxl.load_workbook(self.f)
msg = 'File loaded.'
except FileNotFoundError:
book = openpyxl.Workbook()
msg = 'File created.'
book.worksheets[0].title = 'info'
book.save(self.f)
self.book = book
info = self.get_sheet('info')
info.sheet_properties.tabColor = "1072BA"
self.log('info', msg)
def get_sheet(self, sheet_name):
"""Return existing or create new sheet in the workbook."""
try:
return self.book[sheet_name]
except KeyError:
return self.book.create_sheet(sheet_name)
def image_to_sheet(self, image, sheet_name="Image", anchor='A1'):
ws = self.get_sheet(sheet_name)
img = openpyxl.drawing.image.Image(image)
ws.add_image(img, anchor=anchor)
self.log('info', f'Image added to to sheet {sheet_name}')
self.save()
def log(self, sheet_name, *args):
"""Append row with any number of columns at the end of a sheet.
First column is the timestamp of addition."""
row = [datetime.now().strftime('%Y-%m-%d %H:%M')]
row.extend(args)
sheet = self.get_sheet(sheet_name)
sheet.append(row)
self.save()
def overwrite_sheet(self, sheet_name, df):
"""Write pandas DataFrame to sheet. Overwrite if existing."""
with pd.ExcelWriter(self.f, engine='openpyxl') as writer:
writer.book = openpyxl.load_workbook(self.f)
if sheet_name in writer.book.sheetnames:
del writer.book[sheet_name]
df.to_excel(writer, sheet_name)
self.book = writer.book
writer.save()
def write(self, df, sheet_name="Data", msg="", filters=True):
"""Write a df to a named sheet (Data) in the workbook."""
info = 'Writing data to sheet {}.'.format(sheet_name)
self.log('info', info, msg)
self.overwrite_sheet(sheet_name, df)
if filters:
sheet = self.get_sheet(sheet_name)
full_sheet = "A1:" + get_column_letter(sheet.max_column) + str(sheet.max_row)
self.book[sheet_name].auto_filter.ref = full_sheet
sheet.freeze_panes = 'A2'
self.save()
self.close()
if self.verbose:
print('{}\nFile: {}'.format(info, self.f))
def save(self):
while True:
try:
self.book.save(self.f)
break
except PermissionError:
print(f"""No permission to write to file: {os.getcwd()}/{self.f}.
Please close the file in order to save.
If the error persists, you don't have permission to write.""")
sleep(3)
def close(self):
self.book.close()
def to_df(self, sheet_name):
"""Return a sheet as pandas DataFrame"""
try:
df = pd.read_excel(self.f, encoding = 'utf8', sheet_name=sheet_name, engine='openpyxl')
except XLRDError:
df = pd.DataFrame()
return df
def to_dict(self):
"""Return the entire workbook as a dict of pandas DataFrames"""
d = pd.read_excel(self.f, encoding = 'utf8', sheet_name=None, engine='openpyxl')
return d
```
%% Cell type:code id: tags:
``` python
# quick translate
def translate(query, to_lang='en', file_path=None, meta=False, verbose=True):
"""Get translations to English, from any language,
from the Google Translate API.
Arguments:
query = String or list of strings
to_lang = Sets the language to translate to.
Default: 'en' for English.
file_path = String: File path to store (intermediate) results.
Default: in the current working directory.
meta = Boolean: False (default) returns text only.
True returns Google Translate objects.
"""
def validate_input(query):
"""Transform input to list of non-empty strings."""
assert isinstance(query, (str, list))
if isinstance(query, list):
query = [ str(q) for q in query if len(str(q)) > 0 ]
else:
query = [ query ]
return query
def get_translation(string):
"""Query the Google API and return a string of the translation or,
if meta=True, a JSON string of the translation object.
"""
try:
response = googletrans.Translator(service_urls=['translate.googleapis.com']).translate(string, dest=to_lang)
if meta:
return json.dumps(response)
else:
return response.text
except Exception as e:
return exception_handler(string, e)
def exception_handler(string, e):
if 'request that was too large' in e.__dict__.get('doc', ''):
# Trim to 3200 character or shorter if necessary
new_len = min(len(string) - 100, 3200)
print(f"Request too large. Trimming query string to {new_len} characters.")
string = string[:new_len]
return get_translation(string)
elif 'unusual traffic' in e.__dict__.get('doc', ''):
print(f"Google API: daily limit reached. Try again in 24h.")
return f"<untranslated: {string}>"
else:
raise e
def read_result(file_path):
excel = Excel(file_path)
df = excel.to_df("Data")
d = df.set_index('query').to_dict()['result']
return d
def store_result(result, newly_translated):
excel = Excel(file_path, verbose=False)
df = pd.DataFrame(data={
'query': list(result.keys()),
'result': list(result.values()),
})
msg = f"New queries: {newly_translated}."
excel.write(df, msg=msg, sheet_name="Data")
def main(query):
"""Run translation and handle exceptions."""
newly_translated = 0
query_list = validate_input(query)
# if file_path and not file_path.endswith(".xlsx"):
# file_path = f"{file_path}_fixed.xlsx"
if file_path and os.path.isfile(file_path):
if verbose:
print(f"Using translations from file: {file_path}.")
# Check previous translations using this file
result = read_result(file_path)
# # Use only those that are also present in the current query
# result = { k: v for k, v in all_results.items() if k in query_list }
else:
result = {}
# Wrap the translation in a try loop so that store_result()
# is always triggered, in order to store the progress.
# raise(e) is raised AFTER the 'finally:' clause.
try:
for string in query_list:
if string not in result:
result[string] = get_translation(string)
newly_translated += 1
except Exception as e:
raise(e)
finally:
if file_path:
store_result(result, newly_translated)
if len(query_list) == 1:
return result.get(query_list[0], f"<untranslated: {query_list[0]}>")
else:
return result
return main(query)
def test_translate():
query = [ "Nederlandse", "Kaas", "Tarwe", "Huis" ]
expected_translation = [ "Dutch", "Cheese", "Wheat", "House" ]
try:
expected_result = dict(zip(query, expected_translation))
true_result = translate(query)
assert expected_result == true_result
except AssertionError:
print(f"Query:\n {query}")
print(f"Expected translations:\n {expected_translation}")
print(f"Expected translation() result:\n {expected_result}")
print(f"Actual translation() result:\n {true_result}")
# test_translate()
```
%% Cell type:code id: tags:
``` python
# test_translate()
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment