Source code for textverified.textverified

from dataclasses import dataclass
from typing import Optional, Dict
from .action import _ActionPerformer, _Action, _ActionResponse
from .account_api import AccountAPI
from .billing_cycle_api import BillingCycleAPI
from .exceptions import TextVerifiedError
from .reservations_api import ReservationsAPI
from .sales_api import SalesAPI
from .services_api import ServicesAPI
from .sms_api import SMSApi
from .verifications_api import VerificationsAPI
from .wake_api import WakeAPI
from .call_api import CallAPI
import requests
import datetime
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import dateutil.parser
from http.client import responses


@dataclass(frozen=True)
class BearerToken:
    token: str
    expires_at: datetime.datetime

    def __post_init__(self):
        if not isinstance(self.expires_at, datetime.datetime):
            raise ValueError("expires_at must be a datetime object")
        if self.expires_at.tzinfo is None:
            raise ValueError("expires_at must be timezone-aware (UTC)")

    def is_expired(self) -> bool:
        """Check if the bearer token is expired."""
        return datetime.datetime.now(datetime.timezone.utc) >= self.expires_at


[docs] @dataclass(frozen=False) class TextVerified(_ActionPerformer): """API Context for interacting with the Textverified API.""" api_key: str api_username: str base_url: str = "https://www.textverified.com" user_agent: str = "TextVerified-Python-Client/0.1.0" @property def account(self) -> AccountAPI: return AccountAPI(self) @property def billing_cycles(self) -> BillingCycleAPI: return BillingCycleAPI(self) @property def reservations(self) -> ReservationsAPI: return ReservationsAPI(self) @property def sales(self) -> SalesAPI: return SalesAPI(self) @property def services(self) -> ServicesAPI: return ServicesAPI(self) @property def verifications(self) -> VerificationsAPI: return VerificationsAPI(self) @property def wake_requests(self) -> WakeAPI: return WakeAPI(self) @property def sms(self) -> SMSApi: return SMSApi(self) @property def calls(self) -> CallAPI: return CallAPI(self) def __post_init__(self): self.bearer = None self.base_url = self.base_url.rstrip("/") # Mount session with basic retry strategy for 429 and 5xx errors self.session = requests.Session() retry_strategy = Retry( total=3, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=1, # 1, 2, 4s ) adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter)
[docs] def refresh_bearer(self): """Refresh the bearer token, if expired. Called automatically before performing actions.""" if self.bearer is None or self.bearer.is_expired(): verify = not self.base_url.startswith("http://localhost") and not self.base_url.startswith( "https://localhost" ) response = self.session.post( f"{self.base_url}/api/pub/v2/auth", headers={"X-API-KEY": f"{self.api_key}", "X-API-USERNAME": f"{self.api_username}"}, verify=verify, ) TextVerified.__raise_for_status("POST", f"{self.base_url}/api/pub/v2/auth", response) data = response.json() self.bearer = BearerToken(token=data["token"], expires_at=dateutil.parser.parse(data["expiresAt"]))
def _perform_action(self, action: _Action, **kwargs) -> _ActionResponse: """ Perform an API action and return the result. :param action: The action to perform :return: Dictionary containing the API response """ if "://" in action.href and not action.href.startswith(self.base_url): return self.__perform_action_external(action.method, action.href, **kwargs) else: href = action.href if not action.href.startswith(self.base_url): href = f"{self.base_url}{action.href}" return self.__perform_action_internal(action.method, href, **kwargs) def __perform_action_internal(self, method: str, href: str, **kwargs) -> _ActionResponse: """Internal action performance with authorization""" # Check if bearer token is set and valid self.refresh_bearer() # Prepare and perform the request headers = {"Authorization": f"Bearer {self.bearer.token}", "User-Agent": self.user_agent} # Allow unverified certificates for localhost verify = not href.startswith("http://localhost") and not href.startswith("https://localhost") response = self.session.request(method=method, url=href, headers=headers, verify=verify, **kwargs) TextVerified.__raise_for_status(method, href, response) return _ActionResponse(data=response.json() if response.text else {}, headers=response.headers) def __perform_action_external(self, method: str, href: str, **kwargs) -> _ActionResponse: """External action performance without authorization""" # Allow unverified certificates for localhost verify = not href.startswith("http://localhost") and not href.startswith("https://localhost") response = self.session.request( method=method, url=href, headers={"User-Agent": self.user_agent}, verify=verify, **kwargs ) TextVerified.__raise_for_status(method, href, response) return _ActionResponse(data=response.json() if response.text else {}, headers=response.headers) @classmethod def __raise_for_status(cls, method: str, href: str, response: requests.Response): """Raise an exception for HTTP errors.""" if response.status_code > 299 or response.status_code < 200: http_error_text = f"HTTP {response.status_code} ({responses[response.status_code]}) for {method} {href}" try: error_data = response.json() raise TextVerifiedError( error_code=error_data.get("errorCode"), error_description=error_data.get("errorDescription"), context=http_error_text, ) except ValueError: error_text = response.text raise requests.HTTPError(f"{http_error_text}:\n{error_text}")