import json
import pickle
import pandas as pd
from datetime import datetime, timedelta
from personalcapital import APIConnector
from personalcapital.database import Database
DATE_FORMAT = '%Y-%m-%d'
DEFAULT_LOOKBACK_DAYS = 7
[docs]def make_enum_mapping(values, max_perc_unique=0.5):
"""
Takes a list of string values and creates a dict mapping of str -> int,
emulating the behavior of an Enum.
:param <iterable> values:
values to create a mapping for
:param max_perc_unique:
maximum % of unique values to justify creating a mapping
:return dict:
the create mapping (if justified), otherwise None
"""
# check that creating the mapping is justified
unique_values = set(values)
n_unique = len(unique_values)
perc_unique = float(n_unique) / len(values)
if perc_unique >= max_perc_unique:
return None
mapping = dict()
return {
val: i
for i, val in enumerate(unique_values)
}
# with open("etc/transactions.json") as fp:
# data = json.load(fp)['data']
# df = pd.DataFrame(data)
# import pdb; pdb.set_trace()
# print(make_enum_mapping(df["resultType"]))
[docs]def get_accounts(session):
accounts_response = session.fetch('/newaccount/getAccounts')
with open('accounts_response.json', 'w') as fp:
json.dump(accounts_response.json(), fp, indent=4)
accounts = accounts_response.json()['spData']
print('Networth: {0}'.format(accounts['networth']))
return accounts
[docs]def fetch_and_dump(session, endpoint, fn):
response = session.fetch(endpoint)
with open(fn, 'w') as fp:
json.dump(response.json(), fp, indent=4)
[docs]def get_transactions(session, start_date, end_date):
transactions_response = session.fetch('/transaction/getUserTransactions', {
'sort_cols': 'transactionTime',
'sort_rev': 'true',
'page': '0',
'rows_per_page': '100',
'startDate': start_date,
'endDate': end_date,
'component': 'DATAGRID'
}).json()
with open('transactions_response.json', 'w') as fp:
json.dump(transactions_response, fp, indent=4)
transaction_data = transactions_response['spData']
print('Retrieved {} transactions between {} and {}.'.format(
len(transaction_data['transactions']), transaction_data['startDate'], transaction_data['endDate']))
return transaction_data['transactions']
[docs]def mongoize(transaction):
transaction['_id'] = transaction['userTransactionId']
del transaction['userTransactionId']
return transaction
[docs]def main():
with APIConnector.connect() as session, Database() as db:
# transactions = get_transactions(session, start_date, end_date)
# fetch_and_dump(session, '/account/getHistories', 'histories_response.json')
# fetch_and_dump(session, '/invest/getHoldings', 'holdings_response.json')
# fetch_and_dump(session, '/account/getBillReminders', 'bill_reminders_response.json')
# fetch_and_dump(session, '/message/getUserMessages', 'messages_response.json')
# fetch_and_dump(session, '/newaccount/getAccounts2', 'accounts2_response.json')
# fetch_and_dump(session, '/person/getPerson', 'person_response.json')
# fetch_and_dump(session, '/transactioncategory/getCategories', 'categories_response.json')
start_date = (datetime.today() - timedelta(days=DEFAULT_LOOKBACK_DAYS)).strftime(DATE_FORMAT)
end_date = datetime.today().strftime(DATE_FORMAT)
print("Querying for transactions between {} and {}.".format(start_date, end_date))
transactions = list(map(
mongoize,
get_transactions(session, start_date, end_date)
))
with open("transactions.json", "w") as fp:
json.dump({"data": transactions}, fp, indent=4)
print(transactions[0])
# transactions_db = db['banking']['transactions']
if __name__ == '__main__':
main()