Programmatically run a dataflow/dataset

Hello,

 

I would like your help in order to understand how to run a dataset (or a dataflow) based on a trigger outside Domo.

Let's say, for instance, I have a python script that check a condition on my database and if the condition is met, it should trigger the execution of two mySql Connector in Domo.
Would this be possible somehow?

The closest thing I am able to do is to run manually a dataset or a dataflow with the Java CLI, but I was wondering if there is any API that can accomplish my request so that i can call this API whenever I need it.

 

I saw in another question an user talking about undocumented API, but I cannot find anything more about it.

It would be enough to understand how the two DOMO CLI commands 

dataflow-run-now

and

dataset-run-now

work in order maybe to emulate the same interaction with my Domo instance programmatically.

Best Answer

  • gllrusso
    gllrusso Member
    Answer ✓

    Okay, here we are with the solution to this problem! Special thanks to @jaeW_at_Onyx for all the steps.

     

    I've published https://gist.github.com/giuseppellrusso/03daabba54424ceac83309921121320c#file-domo_utils-py with my module developed to programmatically run a dataset or a dataflow.

     

    As you can see from the code, running a dataset is as easy as running the associated stream and thus we can use the PyDomo library (that is, we are using the official and supported API).

     

    Running the dataflow is a bit trickier, because we have to authenticate to our private instance and use the session token to access an undocumented API. However right now this is working great ? 

     

    I also post here the code: 

     

    from pydomo import Domo
    import logging
    import json
    import requests
    
    """ Connect to Domo using the Pydomo library and the client id and secret
    Documentation:
    https://developer.domo.com/docs/authentication/quickstart-5
    Args:
        client_id (str): client_id to connect with
        client_secret (str): client secret to connect with
        api_host (str): host of the Domo api
        domo_log_level (logging): level of Domo logger. Default set to DEBUG
    Returns:
        an istance of the Domo object from Pydomo library
    """
    def get_domo_connection(client_id, client_secret, api_host, domo_log_level = logging.DEBUG) :
        try :
            domo = Domo(client_id, client_secret, log_level=domo_log_level, api_host=api_host)
            return domo
        except Exception as e :
            logging.error(e)
            raise Exception(e)
    
    """ Gets the stream associated with a Domo dataset 
    Args:
        streams (pydomo object): pydomo object containing the list of streams of a domo instance
        dataset_id (str): alphanumeric id of a domo dataset
        stream_list_limit (int): limit of streams to retrieve from the pydomo streams object
    Returns:
        the pydomo stream object of the stream associated with the given dataset
    """
    def get_stream_by_dataset_id(streams, dataset_id, stream_list_limit = 2000) :
        stream_list = streams.list(stream_list_limit,0)
        dataset_streams = [stream for stream in stream_list if stream['dataSet']['id'] == dataset_id]
        if(len(dataset_streams)==1):
            return dataset_streams[0]
        else:
            no_stream_found_string = 'No stream found for dataset {}'.format(dataset_id)
            logging.error(no_stream_found_string)
            raise Exception(no_stream_found_string)
    
    """ Runs a Domo dataset
    The method is actually creating a new execution for the stream associated to the dataset
    Args:
        domo_connection (pydomo object): instance of the pydomo Domo object
        dataset_id (str): alphanumeric id of a domo dataset
    """
    def run_dataset(domo_connection, dataset_id):
        try : 
            streams = domo_connection.streams
            dataset_stream = get_stream_by_dataset_id(streams, dataset_id)
            dataset_stream_id = dataset_stream['id']
            streams.create_execution(dataset_stream_id)
            logging.info('Successfully run dataset id {}'.format(dataset_id))
        except Exception as e:
            logging.error(e)
            raise Exception(e)
    
    """ Gets a session token to be used for undocumented private api
    Args:
        domo_instance (str): name of the domo instance
        email (str): email  used to login to the domo instance
        password (str): password used to login to the domo instance
    Returns:
        the session token
    """
    def get_session_token(domo_instance, email, password):
        auth_api = 'https://{}.domo.com/api/content/v2/authentication'.format(domo_instance)
        auth_body = json.dumps({
            "method": "password",
            "emailAddress": email,
            "password": password
            })
        auth_headers = {'Content-Type'   : 'application/json'}
        auth_response = requests.post(auth_api, data = auth_body, headers = auth_headers)
        auth_status = auth_response.status_code
        if auth_status == 200 :
            logging.info('Session token acquired.')
            return auth_response.json()['sessionToken']
        else :
            token_error_string = 'Token request ended up with status code {}'.format(auth_status)
            logging.error(token_error_string)
            logging.error(auth_response.text)
            raise Exception(token_error_string)
    
    """ Runs a Domo dataflow
    Args:
        domo_instance (str): name of the domo instance
        domo_token (str): session token used to authenticate to the private apis
        dataflow_id (str): id of the dataflow to be run
    Returns:
        the status of the post request used for running the dataflow
    """
    def run_dataflow(domo_instance, domo_token, dataflow_id):
        dataflow_api = 'https://{}.domo.com/api/dataprocessing/v1/dataflows/{}/executions'.format(domo_instance, dataflow_id)
        dataflow_headers = {'Content-Type'   : 'application/json',
                            'x-domo-authentication'  : domo_token}
        dataflow_response = requests.post(url = dataflow_api, headers = dataflow_headers)
        dataflow_status = dataflow_response.status_code
        if dataflow_status == 200 :
            logging.info('Successfully run dataflow id {}'.format(dataflow_id))
            return dataflow_status
        else :
            dataflow_error_string = 'Dataflow run request ended up with status code {}'.format(dataflow_status)
            logging.error(dataflow_error_string)
            logging.error(dataflow_response.text)
            raise Exception(dataflow_error_string)

Answers

  • you can script the execution of java cli commands as per the documentation.

    https://knowledge.domo.com/Administer/Other_Administrative_Tools/Command_Line_Interface_(CLI)_Tool

    see the section on 'Scripting'

     

    a dataflow is an ETL (Magic or MySQL) whereas a dataset implies a dataset created via a connector (so 'a Facebook dataset or NetSuite dataset.

     

    You can try watching what the CLI executes to get a feel for the APIs that are being called, but as you said ... they are undocumented.

    Jae Wilson
    Check out my 🎥 Domo Training YouTube Channel 👨‍💻

    **Say "Thanks" by clicking the ❤️ in the post that helped you.
    **Please mark the post that solves your problem by clicking on "Accept as Solution"
  • @jaeW_at_Onyx  thanks for your fast answer! I think either scripting the CLI commands or using the corresponding APIs will let me to the job.

     

    I have actually two questions for you, one for datasets and one for dataflows.

     

    1) Dataflows

    What is the difference between using the dataflow-run-now -i <ID> command and the corresponding API called by my script? Even if not documented, isn't it basically the same approach?

     

    2) Datasets

    I have not been able to use the command dataset-run-now -i <ID> -p, it gives me a java.lang.NumberFormatException: are you able to tell me why?. The <ID> I am using is the one I can catch from the URL of the dataset.

     

    However, i have been able to find the stream_id of the dataset using the command list-dataset -i <ID>.

    Then I tried dataset-run-now -i <STREAM_ID> -s and it worked. What is the difference?

    Also, i see this is basically using the (documented) stream API, so maybe it is better to use this second approach for datasets coming from connectors?

  • so much going on here.

     

    there isn't necessarily a difference btwn using the APIs versus the CLI.  the CLI interacts with Domo by passing commands to the API framework... so it's theoretically possible that you are doing the exact same thing.  

     

    "which one is better?" it depends.  how good are you at managing your API calls ?  different tools for different people.  

     

    It is worth noting that SOME of the APIs (documented under developer.domo.com) are 'customer facing' as in Domo Support must support them and they will minimize the impact of changes to APIs.  So it's pretty safe to build against those.

     

    undocumented APIs (which you might get your hands on by watching network traffic in your browser) ARE NOT covered by support and therefore they are under no obligation to ... not impact you or support you when they update or change the APIs 

     

    the Java CLI 'will always work' because it's owned by the Domo Engineering / Labs teams and they extend the Java CLI to solve problems for customers.

     

    in short.  when possible my recommendation is to use the CLI.

     

    re. 2.  if i had to guess the error is your answer. the -i we're looking for is a stream ID (always an integer) not the dataset id (guid string).  always check the included documentation for the CLI by typing HELP.

     

    why do they have stream IDs and dataset ids?  i'm not a Domo engineer, but as I understand it, the dataset ID allows you to identify .... the dataset.  the stream however was implemented to support large file transfers where you could send multpile data partitions (chunks) into one dataset spread across different streams simultaneously.

     

    Regarding which is better to use ... LOL.  ah if only Domo would tell us these things ?  there are (last time i was told) 3 different APIs.  A Stream API, a Multipart API ... and ... maybe dataset API.  each of them ingest data in slightly different ways and support different use cases.  Which ones the best?  :( i don't know. 

     

    I believe workbench and the java cli will 'pick' the best API for the task or depending on the configuration.

     

    If you're talking sub million row datasets i suspect it doesn't really matter and you'll get similar results.  So i'd say, don't sweat it.

     

    Why aren't you using Workbench?

     

    I ask because, if you fall off a truck, or win the lottery, or want to go on holiday, it makes sense to implement pipelines that don't require YOU to maintain ? i know it's job security, but it also makes you the bottleneck ...

     

     

    Jae Wilson
    Check out my 🎥 Domo Training YouTube Channel 👨‍💻

    **Say "Thanks" by clicking the ❤️ in the post that helped you.
    **Please mark the post that solves your problem by clicking on "Accept as Solution"
  • First of all, thank you so much for your answers. It is really helping me.

     

    My goal here is to have an external scheduler that let me manage different executions in different systems, not only in Domo. That's why I'm trying to access all the systems through their API, so I can actually write my manager as I wish with my freedom. I don't think this is something Workbench can help me with, or am I missing something?

     

    By the way, for what concern datasets and their ids / stream_ids I bet this is something we would not really know, but at least I have been able to use the documented stream api to launch programmatically the dataset connectors, so I'm very happy to use the documented APIs.

     

    Instead I'm still having some troubles with dataflows. I've run successfully this command:

    dataflow-run-now -i 350

    And what I see from the CLI is that this API is called:

    Executing POST: https://[my_domain].domo.com/api/dataprocessing/v1/dataflows/350/executions

     

     

    However, when trying to execute a POST request (in a Python script) using this URL and the authentication that works perfectly with the other APIs I get this result:

    {"status":401,"statusReason":"Unauthorized","path":"/api/dataprocessing/v1/dataflows/350/executions","message":"Full authentication is required to access this resource","toe":"XXXXXXXXXXXXX"}

     

    To generate the token I'm using the same credentials (I'm an admin) I use to connect to the Java-CLI, so this really makes no sense to me. Do you or anyone have any suggestion on what could be the problem?

    I may try the approach with the Java CLI scripting, but I'm stubborn and I would definitely like to understand what is the difference in the two approaches ?

  • ? ok... so ... tradesies.

     

    i'm taking the time to help you.  in exchange, i'd appreciate it if you do a writeup for the community that steps through the final  (anonymized) script you develop.

     

    deal?

     

    what you're encountering here, is the difference between using a public vs a private API.  hence why it's asking you for a different authentication method.

     

    {"status":401,"statusReason":"Unauthorized","path":"/api/dataprocessing/v1/dataflows/350/executions","message":"Full authentication is required to access this resource","toe":"XXXXXXXXXXXXX"}

     

    In order for this to work you must:

     

    send a POST request to

    https://yourDomoInstance.domo.com/api/content/v2/authentication

     

    with a Body containing{
        "method""password",
        "emailAddress""yourUserName",
        "password""yourPassword"
    }
    in the response you'll get a sessionToken.
     
    pop that token either into your cookies as
    DA-SID=sessionToken
     
    OR for future API requests (any that are prefixed with your instance name)
    in the headers for authentication use 
    X-DOMO-Developer-Token : sessionToken.
     
    Let me know if that works.  P.S. AFAIK this is all unsupported. 
     
    Jae Wilson
    Check out my 🎥 Domo Training YouTube Channel 👨‍💻

    **Say "Thanks" by clicking the ❤️ in the post that helped you.
    **Please mark the post that solves your problem by clicking on "Accept as Solution"
  • Hi Jae, of course I'm going to do a writeup for the whole community!

     

    Let's start (maybe at the end I will host this code somewhere improving it and commenting properly) with dataset powered into Domo through a connector: this is the solution you can use to run it with Python and Pydomo using supported and documented API.

     

    from pydomo import Domo
    
    client_id = <CLIENT_ID>
    client_secret = <CLIENT_SECRET>
    api_host = 'api.domo.com'
    
    stream_id = <STREAM_ID>
    
    domo = Domo(client_id, client_secret, api_host=api_host)
    streams = domo.streams
    execution = streams.create_execution(stream_id)

     

     Just a not on how to get the <STREAM_ID>.

    From the Domo Web Interface you are able to get only the <DATASET_ID>, but as we stated before behind the scenes there is a <STREAM_ID> associated to the dataset.

    Right now I have two approaches:

    1. Use the java-cli and the command list-dataset -i <DATASET_ID> and extract the <STREAM_ID> from the output given by the java-cli
    2. Loop over the streams and find the one associated to your <DATASET_ID>. I would not recommend using this solution though:

     

    def get_stream_id_by_dataset_id(domo, list_limit, dataset_id) :
        streams = domo.streams
        stream_list = streams.list(list_limit, 0)
        dataset_streams = [stream for stream in stream_list if stream['dataSet']['id'] == dataset_id]
        if(len(dataset_streams)==1):
            return dataset_streams[0]['id']
        else:
            raise Exception('No stream is associated to the given dataset')​

     

    Let's go back to our dataflows. I think I can state there is no way to use documented API to run programmatically a dataflow.

    However we can use the dataprocessing API with full authentication using the token, as you suggested.
    This is a working script you can use once you have such a token:

     

    import requests
    
    domo_token = <ACCESS_TOKEN>
    domo_instance = <YOUR_DOMO_INSTANCE>
    dataflow_id = <DATAFLOW_ID>
    dataflow_api = 'https://{}.domo.com/api/dataprocessing/v1/dataflows/{}/executions'.format(domo_instance, dataflow_id)
    
    print(dataflow_api) 
    headers = {'Content-Type'   : 'application/json',
               'Accept'         : 'application/json',
               'X-DOMO-Developer-Token'  : domo_token}
    
    execute_dataflow_response = requests.post(url = dataflow_api, headers = headers)
    
    if execute_dataflow_response.status_code == 200 :
        print(execute_dataflow_response.json())
    else :
        print(execute_dataflow_response.json())
        raise Exception('Dataflow execution API response status code {}'.format(execute_dataflow_response.status_code))

     

     

     

    The very last problem I encountered was about how to get the token. Right now I get the token by just following this guide (official Domo documentation), but I couldn't get it through an API request.

     

    What I tried is to make a POST request to 'https://<my-domo-instance>.domo.com/api/content/v2/authentication' with this data:

     

     

    body_att = {
            "method" : "password",
            "emailAddress" : <MY_EMAIL>,
            "password" : <MY_PASSWORD>}

     

     

    However, I got this response:

     

    {'status': 400, 'statusReason': 'Bad Request', 'message': 'Bad Request', 'toe': 'XXXXXXXXXXXXXXXXXXXX'}

     


    I tried the same switching to v1 ( 'https://<my-domo-instance>.domo.com/api/content/v1/authentication') and I received the 'unauthorized' error:

     

    {'status': 401, 'statusReason': 'Unauthorized', 'path': '/api/content/v1/authentication', 'message': 'Full authentication is required to access this resource', 'toe': 'XXXXXXXXXXXXXXX'}

     

     

    Do you have any clue on what may be the reason?
    However I already want to thank you because even if I have to perform the manual step for managing the token, I achieved the goal to run the dataflow from my script!!! Thanks!!

  • AWESOME WRITEUP.  KEEP IT UP!  We really need more content like this

     

    when i copy paste this into postman I get a good response.

     

    curl --location --request POST 'https://<>.domo.com/api/content/v2/authentication' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "method""password",
        "emailAddress": "<>",
        "password": "<>"
    }'
     
    side note, there's a global domo user group slack channel, https://join.slack.com/t/domousergroup/shared_invite/zt-dxq99gyf-MBatRTzIbpUAqTCYfDDJZw you're welcome to join and find me there!
    Jae Wilson
    Check out my 🎥 Domo Training YouTube Channel 👨‍💻

    **Say "Thanks" by clicking the ❤️ in the post that helped you.
    **Please mark the post that solves your problem by clicking on "Accept as Solution"
  • gllrusso
    gllrusso Member
    Answer ✓

    Okay, here we are with the solution to this problem! Special thanks to @jaeW_at_Onyx for all the steps.

     

    I've published https://gist.github.com/giuseppellrusso/03daabba54424ceac83309921121320c#file-domo_utils-py with my module developed to programmatically run a dataset or a dataflow.

     

    As you can see from the code, running a dataset is as easy as running the associated stream and thus we can use the PyDomo library (that is, we are using the official and supported API).

     

    Running the dataflow is a bit trickier, because we have to authenticate to our private instance and use the session token to access an undocumented API. However right now this is working great ? 

     

    I also post here the code: 

     

    from pydomo import Domo
    import logging
    import json
    import requests
    
    """ Connect to Domo using the Pydomo library and the client id and secret
    Documentation:
    https://developer.domo.com/docs/authentication/quickstart-5
    Args:
        client_id (str): client_id to connect with
        client_secret (str): client secret to connect with
        api_host (str): host of the Domo api
        domo_log_level (logging): level of Domo logger. Default set to DEBUG
    Returns:
        an istance of the Domo object from Pydomo library
    """
    def get_domo_connection(client_id, client_secret, api_host, domo_log_level = logging.DEBUG) :
        try :
            domo = Domo(client_id, client_secret, log_level=domo_log_level, api_host=api_host)
            return domo
        except Exception as e :
            logging.error(e)
            raise Exception(e)
    
    """ Gets the stream associated with a Domo dataset 
    Args:
        streams (pydomo object): pydomo object containing the list of streams of a domo instance
        dataset_id (str): alphanumeric id of a domo dataset
        stream_list_limit (int): limit of streams to retrieve from the pydomo streams object
    Returns:
        the pydomo stream object of the stream associated with the given dataset
    """
    def get_stream_by_dataset_id(streams, dataset_id, stream_list_limit = 2000) :
        stream_list = streams.list(stream_list_limit,0)
        dataset_streams = [stream for stream in stream_list if stream['dataSet']['id'] == dataset_id]
        if(len(dataset_streams)==1):
            return dataset_streams[0]
        else:
            no_stream_found_string = 'No stream found for dataset {}'.format(dataset_id)
            logging.error(no_stream_found_string)
            raise Exception(no_stream_found_string)
    
    """ Runs a Domo dataset
    The method is actually creating a new execution for the stream associated to the dataset
    Args:
        domo_connection (pydomo object): instance of the pydomo Domo object
        dataset_id (str): alphanumeric id of a domo dataset
    """
    def run_dataset(domo_connection, dataset_id):
        try : 
            streams = domo_connection.streams
            dataset_stream = get_stream_by_dataset_id(streams, dataset_id)
            dataset_stream_id = dataset_stream['id']
            streams.create_execution(dataset_stream_id)
            logging.info('Successfully run dataset id {}'.format(dataset_id))
        except Exception as e:
            logging.error(e)
            raise Exception(e)
    
    """ Gets a session token to be used for undocumented private api
    Args:
        domo_instance (str): name of the domo instance
        email (str): email  used to login to the domo instance
        password (str): password used to login to the domo instance
    Returns:
        the session token
    """
    def get_session_token(domo_instance, email, password):
        auth_api = 'https://{}.domo.com/api/content/v2/authentication'.format(domo_instance)
        auth_body = json.dumps({
            "method": "password",
            "emailAddress": email,
            "password": password
            })
        auth_headers = {'Content-Type'   : 'application/json'}
        auth_response = requests.post(auth_api, data = auth_body, headers = auth_headers)
        auth_status = auth_response.status_code
        if auth_status == 200 :
            logging.info('Session token acquired.')
            return auth_response.json()['sessionToken']
        else :
            token_error_string = 'Token request ended up with status code {}'.format(auth_status)
            logging.error(token_error_string)
            logging.error(auth_response.text)
            raise Exception(token_error_string)
    
    """ Runs a Domo dataflow
    Args:
        domo_instance (str): name of the domo instance
        domo_token (str): session token used to authenticate to the private apis
        dataflow_id (str): id of the dataflow to be run
    Returns:
        the status of the post request used for running the dataflow
    """
    def run_dataflow(domo_instance, domo_token, dataflow_id):
        dataflow_api = 'https://{}.domo.com/api/dataprocessing/v1/dataflows/{}/executions'.format(domo_instance, dataflow_id)
        dataflow_headers = {'Content-Type'   : 'application/json',
                            'x-domo-authentication'  : domo_token}
        dataflow_response = requests.post(url = dataflow_api, headers = dataflow_headers)
        dataflow_status = dataflow_response.status_code
        if dataflow_status == 200 :
            logging.info('Successfully run dataflow id {}'.format(dataflow_id))
            return dataflow_status
        else :
            dataflow_error_string = 'Dataflow run request ended up with status code {}'.format(dataflow_status)
            logging.error(dataflow_error_string)
            logging.error(dataflow_response.text)
            raise Exception(dataflow_error_string)
This discussion has been closed.