import os, glob
import numpy as np
import pandas as pd
import click
import docutils.nodes
import docutils.parsers.rst
import docutils.utils
SHEET_FORMATS = {}
def parse_rst(text):
parser = docutils.parsers.rst.Parser()
components = (docutils.parsers.rst.Parser,)
settings = (
docutils
.frontend
.OptionParser(components=components)
.get_default_values())
document = docutils.utils.new_document('<rst-doc>', settings=settings)
parser.parse(text, document)
return document
def rst_walker(sheet_writer, section, level=0, row=0, col=0):
if not hasattr(section, 'children'):
sheet_writer.write(row, col, section.astext().strip('\n'))
row += 1
return row
for block in section.children:
if hasattr(block, 'tagname') and (block.tagname == 'title'):
row += 1
fmt = SHEET_FORMATS.get(level, None)
sheet_writer.write(row, col, block.astext().strip('\n'), fmt)
row += 1
elif (
hasattr(block, 'tagname')
and (
(block.tagname == 'paragraph')
or (block.tagname == '#text')
or (level > 3))):
fmt = SHEET_FORMATS.get('text', None)
sheet_writer.write(
row,
col,
block.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
elif hasattr(block, 'tagname') and (block.tagname == 'bullet_list'):
fmt = SHEET_FORMATS.get('text', None)
for li in block.children:
sheet_writer.write(
row,
col,
'• ' + li.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
elif hasattr(block, 'tagname') and (block.tagname == 'enumerated_list'):
fmt = SHEET_FORMATS.get('text', None)
for i, li in enumerate(block.children):
sheet_writer.write(
row,
col,
'{}. '.format(i + 1) + li.astext().strip('\n').replace('\n', ' '),
fmt)
row += 1
else:
row = rst_walker(sheet_writer, block, level + 1, row=row, col=col)
return row
def readme_to_excel(readme_path, excel_writer, sheet_name='README', start_row=2, start_col=1):
with open(readme_path, 'r') as f:
doc = parse_rst(f.read())
workbook = excel_writer.book
worksheet = excel_writer.book.add_worksheet(sheet_name)
worksheet.set_column(1, 2, 60.)
SHEET_FORMATS.update({
1: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 18}),
2: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 14}),
3: workbook.add_format({
'bold': True,
'text_wrap': False,
'valign': 'top',
'font_size': 12}),
'text': workbook.add_format({
'bold': False,
'text_wrap': True,
'valign': 'top',
'font_size': 11})})
rst_walker(worksheet, doc, row=start_row, col=start_col)
def variable_to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):
writer = pd.ExcelWriter(
os.path.join(root_dir, f'{by}{agglev}', f'{file_var}_{agglev}.xlsx'),
engine='xlsxwriter')
option1 = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_README.txt')
option2 = os.path.join(root_dir, f'{by}{agglev}', 'readme.txt')
readme_to_excel(
option1 if os.path.isfile(option1) else option2,
writer)
hist_file = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_{agglev}_historical_1970-1990.csv')
if os.path.isfile(hist_file):
hist_data = pd.read_csv(hist_file).rename_axis('_INDEX')
hist_data = hist_data.set_index(
list(np.array(list(hist_data.columns))[list(range(geog_cols))]),
append=True)
hist = pd.concat(
{'historical': pd.concat(
{'1970-1990': pd.concat(
{'observed': hist_data},
names=['likelihood'])},
names=['period'])},
names=['scenario'])
(
hist
.xs('1970-1990', level='period')
.unstack(['scenario', 'likelihood'])
.reset_index('_INDEX', drop=True)
.to_excel(writer, sheet_name='1970-1990'))
dfs = {}
periods = ['2010-2030', '2020-2040', '2040-2060', '2060-2080', '2080-2100']
if scen:
rcps = ['expected-emissions', 'high-emissions']
else:
rcps = ['rcp45', 'rcp85']
for rcp in rcps:
dfp = {}
for period in periods:
fp = os.path.join(
root_dir,
f'{by}{agglev}',
f'{file_var}_{agglev}_{rcp}_{period}.csv')
if os.path.isfile(fp):
df = pd.read_csv(fp)
df = df.set_index(pd.Index(
np.hstack([np.arange(len(df)//5) for _ in range(5)]),
name='_INDEX'))
df = df.set_index(
list(df.columns.values[list(range(geog_cols + 1))]),
append=True)
dfp[period] = df
else:
print('nooop: {}'.format(fp))
if len(dfp) > 0:
dfs[rcp] = pd.concat(dfp, names=['period'])
proj = pd.concat(dfs, names=['scenario'])
proj.index.set_names('likelihood', level='quantile', inplace=True)
for period in proj.index.get_level_values('period').unique():
(
proj
.xs(period, level='period')
.unstack(['scenario', 'likelihood'])
.reset_index('_INDEX', drop=True)
.to_excel(writer, sheet_name=period))
writer.save()
@click.command()
@click.argument('varname')
@click.argument('root_dir')
@click.argument('agglev')
@click.option('--by', default='by_', help='Optional prefix for agglev (default "by_{AGGLEV}")')
@click.option('--scen/--no-scen', default=False, is_flag=True, help=(
'Use scenario names (e.g. high-emissions). Default is '
'to use rcp names (e.g. rcp85). This should reflect the '
'input file names/contents... the output will match the '
'inputs.'))
@click.option('--geog_cols', default=1, help='number of index columns to read in geography (default 1)', type=int)
@click.option('--file-var', default=None, help='varname used in csv file names (default VARNAME)')
def to_excel(varname, root_dir, agglev, by='by_', scen=False, geog_cols=1, file_var=None):
'''
Converts a standard Rhodium Climate Risk Service output csv+readme
directory into a single excel file.
Accepts as arguments the variable name (used in filenames), root directory
of data (containing agglev directories), and the regional aggregation level
(e.g. county, cbsa)
Example usage:
python build_excel.py total-economic-impact-as-share-of-GDP . cbsa
'''
if file_var is None:
file_var = varname
variable_to_excel(
varname, root_dir, agglev, by=by, scen=scen, geog_cols=geog_cols, file_var=file_var)
if __name__ == "__main__":
to_excel()