Source code for personalcapital.util.find_enums

import json
import pickle
import pandas as pd
from datetime import datetime, timedelta

from personalcapital import APIConnector
from personalcapital.database import Database

DATE_FORMAT = '%Y-%m-%d'
DEFAULT_LOOKBACK_DAYS = 7


[docs]def make_enum_mapping(values, max_perc_unique=0.5): """ Takes a list of string values and creates a dict mapping of str -> int, emulating the behavior of an Enum. :param <iterable> values: values to create a mapping for :param max_perc_unique: maximum % of unique values to justify creating a mapping :return dict: the create mapping (if justified), otherwise None """ # check that creating the mapping is justified unique_values = set(values) n_unique = len(unique_values) perc_unique = float(n_unique) / len(values) if perc_unique >= max_perc_unique: return None mapping = dict() return { val: i for i, val in enumerate(unique_values) }
# with open("etc/transactions.json") as fp: # data = json.load(fp)['data'] # df = pd.DataFrame(data) # import pdb; pdb.set_trace() # print(make_enum_mapping(df["resultType"]))
[docs]def get_accounts(session): accounts_response = session.fetch('/newaccount/getAccounts') with open('accounts_response.json', 'w') as fp: json.dump(accounts_response.json(), fp, indent=4) accounts = accounts_response.json()['spData'] print('Networth: {0}'.format(accounts['networth'])) return accounts
[docs]def fetch_and_dump(session, endpoint, fn): response = session.fetch(endpoint) with open(fn, 'w') as fp: json.dump(response.json(), fp, indent=4)
[docs]def get_transactions(session, start_date, end_date): transactions_response = session.fetch('/transaction/getUserTransactions', { 'sort_cols': 'transactionTime', 'sort_rev': 'true', 'page': '0', 'rows_per_page': '100', 'startDate': start_date, 'endDate': end_date, 'component': 'DATAGRID' }).json() with open('transactions_response.json', 'w') as fp: json.dump(transactions_response, fp, indent=4) transaction_data = transactions_response['spData'] print('Retrieved {} transactions between {} and {}.'.format( len(transaction_data['transactions']), transaction_data['startDate'], transaction_data['endDate'])) return transaction_data['transactions']
[docs]def mongoize(transaction): transaction['_id'] = transaction['userTransactionId'] del transaction['userTransactionId'] return transaction
[docs]def main(): with APIConnector.connect() as session, Database() as db: # transactions = get_transactions(session, start_date, end_date) # fetch_and_dump(session, '/account/getHistories', 'histories_response.json') # fetch_and_dump(session, '/invest/getHoldings', 'holdings_response.json') # fetch_and_dump(session, '/account/getBillReminders', 'bill_reminders_response.json') # fetch_and_dump(session, '/message/getUserMessages', 'messages_response.json') # fetch_and_dump(session, '/newaccount/getAccounts2', 'accounts2_response.json') # fetch_and_dump(session, '/person/getPerson', 'person_response.json') # fetch_and_dump(session, '/transactioncategory/getCategories', 'categories_response.json') start_date = (datetime.today() - timedelta(days=DEFAULT_LOOKBACK_DAYS)).strftime(DATE_FORMAT) end_date = datetime.today().strftime(DATE_FORMAT) print("Querying for transactions between {} and {}.".format(start_date, end_date)) transactions = list(map( mongoize, get_transactions(session, start_date, end_date) )) with open("transactions.json", "w") as fp: json.dump({"data": transactions}, fp, indent=4) print(transactions[0])
# transactions_db = db['banking']['transactions'] if __name__ == '__main__': main()