Source code for dkutils.veeva_network_api.veeva_network_client

import logging
import time
import os
from enum import Enum

import requests

logger = logging.getLogger(__name__)

TERMINAL_STATES = {
    "CANCELED", "COMPLETE", "FAILED", "INTERRUPTED", "KILLED", "PARKED", "PAUSED", "SUSPENDED"
}

DEFAULT_VERSION = "v16.0"


def _raise_exception(msg):
    logger.error(msg)
    raise VeevaNetworkException(msg)


[docs]def create_veeva_network_subscription_client( dns=None, username=None, password=None, subscription_name=None, subscription_type=None, system_name=None, version=None ): """ Create a client that enables you to manage subscriptions that import and export data to and from the Veeva Network. Parameters ---------- dns : str, opt The URL for your API service, if this variable is absent the the value will be obtained from the environment variable VEEVA_DNS username : str The user ID for Network; for example, john.smith@veevanetwork.com. if this variable is absent the the value will be obtained from the environment variable VEEVA_USERNAME password : str, opt The password for the user ID. if this variable is absent the the value will be obtained from the environment variable VEEVA_PASSWORD system_name : str, opt The unique name of the system if this variable is absent the the value will be obtained from the environment variable VEEVA_SYSTEM_NAME subscription_name : str, opt The unique name of the subscription if this variable is absent the the value will be obtained from the environment variable VEEVA_SUBSCRIPTION_NAME subscription_type : VeevaNetworkSubscriptionType Ff this variable is absent then the value will be obtained from the environment variable VEEVA_SUBSCRIPTION_TYPE the type of subscription version : str, opt The API version if this variable is absent the the value will be obtained from the environment variable Raises ------ HTTPError If the request fails VeevaNetworkException If subscription_type is not VeevaNetworkSubscriptionType.SOURCE or VeevaNetworkSubscriptionType.TARGET. If the authorization header is not available """ if not dns: dns = os.environ['VEEVA_DNS'] if not username: username = os.environ['VEEVA_USERNAME'] if not password: password = os.environ['VEEVA_PASSWORD'] if not system_name: system_name = os.environ['VEEVA_SYSTEM_NAME'] if not subscription_name: subscription_name = os.environ['VEEVA_SUBSCRIPTION_NAME'] if not subscription_type: subscription_type = VeevaNetworkSubscriptionType[ os.environ['VEEVA_SUBSCRIPTION_TYPE'].upper()] if not version: version = os.environ.get("VEEVA_VERSION", DEFAULT_VERSION) if subscription_type == VeevaNetworkSubscriptionType.SOURCE: return VeevaSourceSubscriptionClient( dns=dns, username=username, password=password, subscription_name=subscription_name, system_name=system_name, version=version ) elif subscription_type == VeevaNetworkSubscriptionType.TARGET: return VeevaTargetSubscriptionClient( dns=dns, username=username, password=password, subscription_name=subscription_name, system_name=system_name, version=version ) _raise_exception( f"Subscription type must be either VeevaNetworkSubscriptionType.SOURCE or " f"VeevaNetworkSubscriptionType.TARGET but was {subscription_type}" )
[docs]def run_subscription_job( dns=None, username=None, password=None, subscription_name=None, subscription_type=None, system_name=None, version=None ): """ This function will run a subscription job and poll until the job has reached a terminal state. Parameters ---------- dns : str, opt is the URL for your API service, if this variable is absent the the value will be obtained from the environment variable VEEVA_DNS username : str the user ID for Network; for example, john.smith@veevanetwork.com. if this variable is absent the the value will be obtained from the environment variable VEEVA_USERNAME password : str, opt the password for the user ID. if this variable is absent the the value will be obtained from the environment variable VEEVA_PASSWORD system_name : str, opt is the unique name of the system if this variable is absent the the value will be obtained from the environment variable VEEVA_SYSTEM_NAME subscription_name : str, opt is the unique name of the subscription if this variable is absent the the value will be obtained from the environment variable VEEVA_SUBSCRIPTION_NAME subscription_type : VeevaNetworkSubscriptionType if this variable is absent the the value will be obtained from the environment variable VEEVA_SUBSCRIPTION_TYPE the type of subscription version : str, opt is the API version if this variable is absent the the value will be obtained from the environment variable Raises ------ HTTPError If the request fails ValueError If the job reaches a terminal status other than COMPLETE Returns ------- dict A dictionary containing the information about the job. For a source subscription the will be similar to the following:: { "filesProcessed" : 1, "errorCount" : 0, "subscriptionName" : "CRM_Import", "completed_date" : "2019-12-18T22:42:32.000Z", "job_status" : "COMPLETE", "job_id" : 10537, "jobResultSummary" : { "CUSTOMKEY" : { "total" : 2, "recordsUpdated" : 0, "recordsSkipped" : 2, "newRecordsAdded" : 0, "recordsInvalidated" : 0, "recordsMerged" : 0 }, "HCP" : { "total" : 1, "recordsUpdated" : 0, "recordsSkipped" : 1, "recordsMerged" : 0, "recordsInvalidated" : 0, "newRecordsAdded" : 0 }, "ADDRESS" : { "total" : 1, "recordsUpdated" : 0, "recordsSkipped" : 1, "recordsInvalidated" : 0, "newRecordsAdded" : 0, "recordsMerged" : 0 } }, "durationInMilliseconds" : 3000, "created_date" : "2019-12-18T22:42:29.000Z", "subscriptionId" : 117, "processedDataSummary" : { "HCP" : 1, "ADDRESS" : 1 }, "type" : "MANUAL", "dataLoadSummary" : { "ADDRESS" : { "rowsRead" : 1, "rowsParsed" : 1 }, "HCP" : { "rowsParsed" : 1, "rowsRead" : 1 } }, "badRecordCount" : 0, "recordCount" : 2, "matchSummary" : { "HCP" : { "ACT" : 1, "notMatched" : 0, "ASK" : 0 }, "HCO" : { "ACT" : 0, "notMatched" : 0, "ASK" : 0 } }, "responseStatus" : "SUCCESS" } For a target source a dictionary similar to the following is returned:: { "responseStatus": "SUCCESS", "subscriptionId": 15, "subscriptionName": "targetSubscriptionCustomer", "durationInMilliseconds": 2000, "type": "MANUAL", "errorCount": 0, "badRecordCount": 0, "exportReferenceCount": 0, "exportFull": True, "exportIncludeReference": False, "exportUpdatedChildOnly": False, "exportSetSubscriptionStateOnFull": False, "exportFormat": "CSV", "exportReferenceVersion": "4", "exportActiveOnly": False, "jobExportCount": { "LICENSE": 3961, "RELATION": 333, "HCO": 819, "HCP": 1060, "ADDRESS": 1801, "EXTERNALKEYS": 8038 }, "job_id": 10563, "job_status": "COMPLETE", "created_date": "2016-11-17T10:58:49.000-08:00", "data_revision_first": "0", "data_revision_last": "929335226137870335", "export_package_path": "export/change_request/targetSubscriptionCustomer/exp_000001C5.zip", "total_records_exported": "1879", "completed_date": "2016-11-17T10:58:51.000-08:00", "export_archive": "individual", "exportFormatDelimiter":"|", "exportFormatTextQualifier":"\"" } """ client = create_veeva_network_subscription_client( dns=dns, username=username, password=password, subscription_name=subscription_name, subscription_type=subscription_type, system_name=system_name, version=version ) job = client.run_subscription_process() return client.retrieve_network_process_job(job)
[docs]class VeevaNetworkException(Exception): pass
[docs]class VeevaNetworkSubscriptionType(str, Enum): SOURCE = 'source', TARGET = 'target'
[docs]class VeevaNetworkClient:
[docs] def __init__(self, dns, username, password, version): """ Create a client for accessing Veeva Network. This class should not be instantiated directly. You should use either VeevaSourceSubscriptionClient or VeevaTargetSubscriptionClient Parameters ---------- dns : str is the URL for your API service username : str the user ID for Network; for example, john.smith@veevanetwork.com. password : str the password for the user ID. version : str is the API version Raises ------ HTTPError If the request fails ValueError If the authorization header is not available """ self.base_url = f'https://{dns}/api/{version if version else "v16.0"}/' logger.info('VEEVA NETWORK: Attempting Authenticating') response = requests.post( self.base_url + 'auth', data={ 'username': username, 'password': password } ) response.raise_for_status() # The headers are then passed through as a request authorization header self.admin_header = { 'Authorization': response.json().get('sessionId'), 'Content-type': 'application/json', 'Accept': 'application/json' } if self.admin_header['Authorization'] is None: _raise_exception('Could not get an authorization header') else: logger.info('VEEVA NETWORK: Authentication Successful!')
[docs]class VeevaSourceSubscriptionClient(VeevaNetworkClient):
[docs] def __init__( self, dns, username, password, subscription_name, system_name, version=DEFAULT_VERSION ): """ Create a client that enables you to manage source subscriptions that import and export data to and from the Veeva Network. Parameters ---------- dns : str is the URL for your API service username : str the user ID for Network; for example, john.smith@veevanetwork.com. password : str the password for the user ID. system_name : str is the unique name of the system subscription_name : str is the unique name of the subscription version : str is the API version Raises ------ HTTPError If the request fails ValueError If the authorization header is not available """ self.subscription_name = subscription_name self.system_name = system_name self.subscription_type = VeevaNetworkSubscriptionType.SOURCE.value super().__init__(dns, username, password, version)
[docs] def run_subscription_process(self): """ This API enables you to create a source or target subscription job. Note: This API requires system administrator or API-only permissions. Raises ------ HTTPError If the request fails Returns ------- str the unique ID of the job generated for the source subscription you specified. """ response = requests.post( f'{self.base_url}systems/{self.system_name}/{self.subscription_type}_subscriptions/' f'{self.subscription_name}/job', headers=self.admin_header ) response.raise_for_status() json = response.json() status = json['responseStatus'] if status != 'SUCCESS': _raise_exception( f"The job could not be started. Status: {status} - {json['responseMessage']}" ) return json.get('job_id')
def _retrieve_network_process_job(self, job_resp_id, sleep_seconds=5): """ Retrieve the status of a source or target subscription job. This function will poll until the job has reached a terminal state. Parameters ---------- job_resp_id : str is the unique ID of the job you want to retrieve status on sleep_seconds : float, optional sleep the given number of seconds betweening polling for the job status. The argument may be a floating point number for subsecond precision. Raises ------ HTTPError If the request fails ValueError If the job reaches a terminal status other than COMPLETE Returns ------- dict A dictionary containing the information about the job. """ while True: response = requests.get( f'{self.base_url}systems/{self.system_name}/{self.subscription_type}_subscriptions/' f'{self.subscription_name}/job/{job_resp_id}', headers=self.admin_header ) response.raise_for_status() json = response.json() status = json['responseStatus'] if status != 'SUCCESS': _raise_exception( f"The job status could not be retrieved. Status: {status} - {json['errorType']}" ) job_status = json.get('job_status') if job_status not in TERMINAL_STATES: logger.info(job_status) time.sleep(sleep_seconds) else: if job_status == 'SUSPENDED': _raise_exception( "VEEVA NETWORK: The job is scheduled and is waiting for shared resources prior to " "becoming active." ) if job_status == 'COMPLETE': break _raise_exception(f'The job has terminated with an unexpected status: {job_status}') return response.json()
[docs] def retrieve_network_process_job(self, job_resp_id): """ Retrieve the status of a source or target subscription job. This function will poll until the job has reached a terminal state. Parameters ---------- job_resp_id : str is the unique ID of the job you want to retrieve status on Raises ------ HTTPError If the request fails ValueError If the job reaches a terminal status other than COMPLETE Returns ------- dict A dictionary containing the information about the job:: { "filesProcessed" : 1, "errorCount" : 0, "subscriptionName" : "CRM_Import", "completed_date" : "2019-12-18T22:42:32.000Z", "job_status" : "COMPLETE", "job_id" : 10537, "jobResultSummary" : { "CUSTOMKEY" : { "total" : 2, "recordsUpdated" : 0, "recordsSkipped" : 2, "newRecordsAdded" : 0, "recordsInvalidated" : 0, "recordsMerged" : 0 }, "HCP" : { "total" : 1, "recordsUpdated" : 0, "recordsSkipped" : 1, "recordsMerged" : 0, "recordsInvalidated" : 0, "newRecordsAdded" : 0 }, "ADDRESS" : { "total" : 1, "recordsUpdated" : 0, "recordsSkipped" : 1, "recordsInvalidated" : 0, "newRecordsAdded" : 0, "recordsMerged" : 0 } }, "durationInMilliseconds" : 3000, "created_date" : "2019-12-18T22:42:29.000Z", "subscriptionId" : 117, "processedDataSummary" : { "HCP" : 1, "ADDRESS" : 1 }, "type" : "MANUAL", "dataLoadSummary" : { "ADDRESS" : { "rowsRead" : 1, "rowsParsed" : 1 }, "HCP" : { "rowsParsed" : 1, "rowsRead" : 1 } }, "badRecordCount" : 0, "recordCount" : 2, "matchSummary" : { "HCP" : { "ACT" : 1, "notMatched" : 0, "ASK" : 0 }, "HCO" : { "ACT" : 0, "notMatched" : 0, "ASK" : 0 } }, "responseStatus" : "SUCCESS" } """ return self._retrieve_network_process_job(job_resp_id=job_resp_id)
[docs]class VeevaTargetSubscriptionClient(VeevaSourceSubscriptionClient):
[docs] def __init__( self, dns, username, password, subscription_name, system_name, version=DEFAULT_VERSION ): """ Create a client that enables you to manage target subscriptions that import and export data to and from the Veeva Network. Parameters ---------- dns : str is the URL for your API service username : str the user ID for Network; for example, john.smith@veevanetwork.com. password : str the password for the user ID. system_name : str is the unique name of the system subscription_name : str is the unique name of the subscription version : str is the API version Raises ------ HTTPError If the request fails ValueError If the authorization header is not available """ super().__init__(dns, username, password, subscription_name, system_name, version) self.subscription_type = VeevaNetworkSubscriptionType.TARGET.value
[docs] def retrieve_network_process_job(self, job_resp_id): """ Retrieve the status of a source or target subscription job. This function will poll until the job has reached a terminal state. Parameters ---------- job_resp_id : str is the unique ID of the job you want to retrieve status on Returns ------- dict A dictionary containing the information about the job. A dictionary similar to the following is returned:: { "responseStatus": "SUCCESS", "subscriptionId": 15, "subscriptionName": "targetSubscriptionCustomer", "durationInMilliseconds": 2000, "type": "MANUAL", "errorCount": 0, "badRecordCount": 0, "exportReferenceCount": 0, "exportFull": True, "exportIncludeReference": False, "exportUpdatedChildOnly": False, "exportSetSubscriptionStateOnFull": False, "exportFormat": "CSV", "exportReferenceVersion": "4", "exportActiveOnly": False, "jobExportCount": { "LICENSE": 3961, "RELATION": 333, "HCO": 819, "HCP": 1060, "ADDRESS": 1801, "EXTERNALKEYS": 8038 }, "job_id": 10563, "job_status": "COMPLETE", "created_date": "2016-11-17T10:58:49.000-08:00", "data_revision_first": "0", "data_revision_last": "929335226137870335", "export_package_path": "export/change_request/targetSubscriptionCustomer/exp_000001C5.zip", "total_records_exported": "1879", "completed_date": "2016-11-17T10:58:51.000-08:00", "export_archive": "individual", "exportFormatDelimiter":"|", "exportFormatTextQualifier":"\"" } Raises ------ HTTPError If the request fails ValueError If the job reaches a terminal status other than COMPLETE """ return self._retrieve_network_process_job(job_resp_id=job_resp_id)