Source code for personalcapital.connection

import re
import os
import json
import logging
import getpass
import requests
from os import path

from personalcapital.exceptions import (
    RequireTwoFactorException,
    LoginFailedException
)

from personalcapital.constants import (
    BASE_URL,
    API_ENDPOINT
)

THIS_DIR = path.dirname(__file__)


[docs]class AuthLevelEnum(object): USER_REMEMBERED = "USER_REMEMBERED"
[docs]class TwoFactorVerificationModeEnum(object): SMS = 0 # PHONE = 1 EMAIL = 2
[docs]class APIConnector(object): """ Personal Capital API APIConnector """ # cross-site request forgery token params CSRF_KEY = "csrf" CSRF_TOKEN_REGEXP = re.compile(r"globals.csrf='([a-f0-9-]+)'") AUTH_LEVEL_KEY = "authLevel" SUCCESS_KEY = "success" SP_HEADER_KEY = "spHeader" ERRORS_KEY = "errors" def __init__(self): self.__session = CookieSession(API_ENDPOINT)
[docs] def login(self, username, password): initial_csrf = self.__get_csrf_from_home_page(BASE_URL) csrf, auth_level = self.__identify_user(username, initial_csrf) if csrf and auth_level: self.session.csrf = csrf if auth_level != AuthLevelEnum.USER_REMEMBERED: raise RequireTwoFactorException() result = self.__authenticate_password(password).json() if self.__get_header_value(result, APIConnector.SUCCESS_KEY) is False: raise LoginFailedException(self.__get_error_value(result)) else: raise LoginFailedException("Did not acquire CSRF token and auth level.")
[docs] def authenticate_password(self, password): return self.__authenticate_password(password)
[docs] def two_factor_authenticate(self, mode, code): if mode == TwoFactorVerificationModeEnum.SMS: return self.__authenticate_sms(code) elif mode == TwoFactorVerificationModeEnum.EMAIL: return self.__authenticate_email(code)
[docs] def two_factor_challenge(self, mode): if mode == TwoFactorVerificationModeEnum.SMS: return self.__challenge_sms() elif mode == TwoFactorVerificationModeEnum.EMAIL: return self.__challenge_email()
@property def csrf(self): return self.session.csrf @csrf.setter def csrf(self, csrf): self.session.csrf = csrf @property def session(self): return self.__session
[docs] @classmethod def connect(cls): pc = cls() user, pw = cls.get_user_credentials() try: pc.login(user, pw) except RequireTwoFactorException: pc.two_factor_challenge(TwoFactorVerificationModeEnum.SMS) pc.two_factor_authenticate(TwoFactorVerificationModeEnum.SMS, input('SMS code: ')) pc.authenticate_password(pw) pc.session.csrf = pc.csrf return pc.session
[docs] @staticmethod def get_user_credentials(): email = os.getenv('PEW_EMAIL') password = os.getenv('PEW_PASSWORD') if not email: print("You can set the environment variables for PEW_EMAIL and PEW_PASSWORD so " "the prompts don't come up every time") return input('Enter email:') if not password: return getpass.getpass('Enter password:') return email, password
@classmethod def __get_header_value(cls, result, valueKey): if (cls.SP_HEADER_KEY in result) and (valueKey in result[cls.SP_HEADER_KEY]): return result[cls.SP_HEADER_KEY][valueKey] return None @classmethod def __get_error_value(cls, result): try: return cls.__get_header_value(result, cls.ERRORS_KEY)[0]['message'] except (ValueError, IndexError): return None def __get_csrf_from_home_page(self, url): r = self.__session.session.get(url) found_csrf = APIConnector.CSRF_TOKEN_REGEXP.search(r.text) if found_csrf: return found_csrf.group(1) return None def __identify_user(self, username, csrf): """ Returns reusable CSRF code and the auth level as a 2-tuple """ data = { "username": username, "csrf": csrf, "apiClient": "WEB", "bindDevice": "false", "skipLinkAccount": "false", "redirectTo": "", "skipFirstUse": "", "referrerId": "", } r = self.session.post("/login/identifyUser", data) if r.status_code == requests.codes.ok: result = r.json() new_csrf = self.__get_header_value(result, APIConnector.CSRF_KEY) auth_level = self.__get_header_value(result, APIConnector.AUTH_LEVEL_KEY) return (new_csrf, auth_level) return (None, None) def __generate_challenge_payload(self, challenge_type): return { "challengeReason": "DEVICE_AUTH", "challengeMethod": "OP", "challengeType": challenge_type, "apiClient": "WEB", "bindDevice": "false", "csrf": self.csrf } def __generate_authentication_payload(self, code): return { "challengeReason": "DEVICE_AUTH", "challengeMethod": "OP", "apiClient": "WEB", "bindDevice": "false", "code": code, "csrf": self.csrf } def __challenge_email(self): data = self.__generate_challenge_payload("challengeEmail") return self.session.post("/credential/challengeEmail", data) def __authenticate_email(self, code): data = self.__generate_authentication_payload(code) return self.session.post("/credential/authenticateEmail", data) def __challenge_sms(self): data = self.__generate_challenge_payload("challengeSMS") return self.session.post("/credential/challengeSms", data) def __authenticate_sms(self, code): data = self.__generate_authentication_payload(code) return self.session.post("/credential/authenticateSms", data) def __authenticate_password(self, passwd): data = { "bindDevice": "true", "deviceName": "", "redirectTo": "", "skipFirstUse": "", "skipLinkAccount": "false", "referrerId": "", "passwd": passwd, "apiClient": "WEB", "csrf": self.csrf } return self.session.post("/credential/authenticatePassword", data)
[docs]class CookieSession(object): SESSION_FN = path.join(THIS_DIR, "session.json") def __init__(self, api_endpoint): self.__session = requests.Session() self.__session_file = CookieSession.SESSION_FN self.__api_endpoint = api_endpoint self.__csrf = "" self.load() def __enter__(self): return self def __exit__(self, *args): self.save() @property def api_endpoint(self): return self.__api_endpoint @api_endpoint.setter def api_endpoint(self, new_endpoint): self.__api_endpoint = new_endpoint @property def session(self): return self.__session @session.setter def session(self, new_session): raise NotImplementedError("Re-assigning internal session not supported.") @property def cookies(self): return requests.utils.dict_from_cookiejar(self.session.cookies) @cookies.setter def cookies(self, cookies): self.session.cookies = requests.utils.cookiejar_from_dict(cookies) @property def csrf(self): return self.__csrf @csrf.setter def csrf(self, csrf): self.__csrf = csrf
[docs] def fetch(self, endpoint, data=None): """ for getting data after logged in """ payload = { "lastServerChangeId": "-1", "csrf": self.csrf, "apiClient": "WEB" } if data is not None: payload.update(data) return self.post(endpoint, payload)
[docs] def post(self, endpoint, data): return self.session.post(self.api_endpoint + endpoint, data)
[docs] def load(self, session_file=None): fn = session_file or self.SESSION_FN if path.isfile(fn): with open(fn) as data_file: cookies = {} try: cookies = json.load(data_file) except ValueError as err: logging.error(err) self.cookies = cookies return self
[docs] def save(self): with open(self.__session_file, 'w') as data_file: data_file.write(json.dumps(self.cookies))