8.6. Algorithm client and tools#

8.6.1. Algorithm Client#

vantage6.algorithm.client#

class AlgorithmClient(token, *args, **kwargs)#

Bases: ClientBase

Interface to communicate between the algorithm container and the central server via a local proxy server.

An algorithm container cannot communicate directly to the central server as it has no internet connection. The algorithm can, however, talk to a local proxy server which has interface to the central server. This way we make sure that the algorithm container does not share details with others, and we also can encrypt the results for a specific receiver. Thus, this not a interface to the central server but to the local proxy server - however, the interface looks identical to make it easier to use.

Parameters:
  • token (str) – JWT (container) token, generated by the node the algorithm container runs on

  • *args – Arguments passed to the parent ClientBase class.

  • **kwargs – Arguments passed to the parent ClientBase class.

class Collaboration(parent)#

Bases: SubClient

Get information about the collaboration.

get()#

Get the collaboration data.

Returns:

Dictionary containing the collaboration data.

Return type:

dict

class Node(parent)#

Bases: SubClient

Get information about the node.

get()#

Get the node data.

Returns:

Dictionary containing data on the node this algorithm is running on.

Return type:

dict

class Organization(parent)#

Bases: SubClient

Get information about organizations in the collaboration.

get(id_)#

Get an organization by ID.

Parameters:

id (int) – ID of the organization to retrieve

Returns:

Dictionary containing the organization data.

Return type:

dict

list()#

Obtain all organization in the collaboration.

The container runs in a Node which is part of a single collaboration. This method retrieves all organization data that are within that collaboration. This can be used to target specific organizations in a collaboration.

Returns:

List of organizations in the collaboration.

Return type:

list[dict]

class Result(parent)#

Bases: SubClient

Result client for the algorithm container.

This client is used to get results from the central server.

from_task(task_id)#

Obtain results from a specific task at the server.

Containers are allowed to obtain the results of their children (having the same job_id at the server). The permissions are checked at te central server.

Results are decrypted by the proxy server and decoded here before returning them to the algorithm.

Parameters:

task_id (int) – ID of the task from which you want to obtain the results

Returns:

List of results. The type of the results depends on the algorithm.

Return type:

list[Any]

get(id_)#

Obtain a specific result from the central server.

Parameters:

id (int) – ID of the algorithm run of which the result should be obtained.

Returns:

Result of the algorithm run.

Return type:

Any

class Run(parent)#

Bases: SubClient

Algorithm Run client for the algorithm container.

This client is used to obtain algorithm runs of tasks with the same job_id from the central server.

from_task(task_id)#

Obtain algorithm runs from a specific task at the server.

Containers are allowed to obtain the runs of their children (having the same job_id at the server). The permissions are checked at te central server.

Note that the returned results are not decrypted. The algorithm is responsible for decrypting the results.

Parameters:

task_id (int) – ID of the task from which you want to obtain the algorithm runs

Returns:

List of algorithm run data. The type of the results depends on the algorithm.

Return type:

list

get(id_)#

Obtain a specific algorithm run from the central server.

Parameters:

id (int) – ID of the algorithm run that should be obtained.

Returns:

Algorithm run data.

Return type:

dict

class Study(parent)#

Bases: SubClient

Get information about the study or studies.

get(id_)#

Get the study data by ID.

Parameters:

id (int) – ID of the study to retrieve

Returns:

Dictionary containing study data.

Return type:

dict

list()#

Obtain all studies in the collaboration.

The container runs in a node which is part of a single collaboration, which may contain zero or more studies. This method retrieves all studies that are part of the collaboration.

Returns:

List of studies in the collaboration.

Return type:

list[dict]

class Task(parent)#

Bases: SubClient

A task client for the algorithm container.

It provides functions to get task information and create new tasks.

create(input_, organizations=None, name='subtask', description=None)#

Create a new (child) task at the central server.

Containers are allowed to create child tasks (having the same job_id) at the central server. The docker image must be the same as the docker image of this container self.

Parameters:
  • input (bytes) – Input to the task. Should be b64 encoded.

  • organizations (list[int]) – List of organization IDs that should execute the task.

  • name (str, optional) – Name of the subtask

  • description (str, optional) – Description of the subtask

Returns:

Dictionary containing information on the created task

Return type:

dict

get(task_id)#

Retrieve a task at the central server.

Parameters:

task_id (int) – ID of the task to retrieve

Returns:

Dictionary containing the task information

Return type:

dict

class VPN(parent)#

Bases: SubClient

A VPN client for the algorithm container.

It provides functions to obtain the IP addresses of other containers.

get_addresses(only_children=False, only_parent=False, only_siblings=False, only_self=False, include_children=False, include_parent=False, label=None)#

Get information about the VPN IP addresses and ports of other algorithm containers involved in the current task. These addresses can be used to send VPN communication to.

Multiple ports may be exposed for a single algorithm run, so it is possible that multiple ports are returned for a single IP.

Parameters:
  • only_children (bool, optional) – Only return the IP addresses of the children of the current task, by default False. Incompatible with other only_* parameters.

  • only_parent (bool, optional) – Only return the IP address of the parent of the current task, by default False. Incompatible with other only_* parameters.

  • only_siblings (bool, optional) – Only return the IP addresses of the siblings of the current task, by default False. Incompatible with other only_* parameters.

  • only_self (bool, optional) – Only return the IP address of the current task, by default False. Incompatible with other only_* parameters.

  • include_children (bool, optional) – Include the IP addresses of the children of the current task, by default False. Incompatible with only_parent, superseded by only_children.

  • include_parent (bool, optional) – Include the IP address of the parent of the current task, by default False. Incompatible with only_children, superseded by only_parent.

  • label (str, optional) – The label of the port you are interested in, which is set in the algorithm Dockerfile. If this parameter is set, only the ports with this label will be returned.

Returns:

List of dictionaries with algorithm addresses. Each dictionary contains the keys ‘ip’, ‘port’, ‘label’, ‘organization_id’, ‘task_id’, and ‘parent_id’. If obtaining the VPN addresses from the server fails, a dictionary with a ‘message’ key is returned instead.

Return type:

list[dict]

get_child_addresses()#

Get the IP addresses and port numbers of the children of the current algorithm run.

Multiple ports may be exposed for a single algorithm run, so it is possible that multiple ports are returned for a single IP.

Returns:

List of dictionaries with algorithm addresses. Each dictionary contains the keys ‘ip’, ‘port’, ‘label’, ‘organization_id’, ‘task_id’, and ‘parent_id’. If obtaining the VPN addresses from the server fails, a dictionary with a ‘message’ key is returned instead.

Return type:

list[dict]

get_own_address()#

Get the IP address and port number of the current algorithm run.

Multiple ports may be exposed for a single algorithm run, so it is possible that multiple ports are returned for a single IP.

Returns:

List of dictionaries with algorithm addresses. Each dictionary contains the keys ‘ip’, ‘port’, ‘label’, ‘organization_id’, ‘task_id’, and ‘parent_id’. If obtaining the VPN addresses from the server fails, a dictionary with a ‘message’ key is returned instead.

Return type:

list[dict]

get_parent_address()#

Get the IP address and port number of the parent of the current algorithm run.

Multiple ports may be exposed for a single algorithm run, so it is possible that multiple ports are returned for a single IP.

Returns:

List of dictionaries with algorithm addresses. Each dictionary contains the keys ‘ip’, ‘port’, ‘label’, ‘organization_id’, ‘task_id’, and ‘parent_id’. If obtaining the VPN addresses from the server fails, a dictionary with a ‘message’ key is returned instead.

Return type:

list[dict]

get_sibling_addresses()#

Get the IP addresses and port numbers of the siblings of the current algorithm run.

Multiple ports may be exposed for a single algorithm run, so it is possible that multiple ports are returned for a single IP.

Returns:

List of dictionaries with algorithm addresses. Each dictionary contains the keys ‘ip’, ‘port’, ‘label’, ‘organization_id’, ‘task_id’, and ‘parent_id’. If obtaining the VPN addresses from the server fails, a dictionary with a ‘message’ key is returned instead.

Return type:

list[dict]

authenticate(credentials=None, path=None)#

Overwrite base authenticate function to prevent algorithm containers from trying to authenticate, which they would be unable to do (they are already provided with a token on container startup).

Function parameters have only been included to make the interface identical to the parent class. They are not used.

Parameters:
  • credentials (dict) – Credentials to authenticate with.

  • path (str) – Path to the credentials file.

Raises:

NotImplementedError – Always.

Return type:

None

refresh_token()#

Overwrite base refresh_token function to prevent algorithm containers from trying to refresh their token, which they would be unable to do.

Raises:

NotImplementedError – Always.

Return type:

None

request(*args, **kwargs)#

Make a request to the central server. This overwrites the parent function so that containers will not try to refresh their token, which they would be unable to do.

Parameters:
  • *args – Arguments passed to the parent ClientBase.request function.

  • **kwargs – Arguments passed to the parent ClientBase.request function.

Returns:

Response from the central server.

Return type:

dict

wait_for_results(task_id, interval=1)#

Poll the central server until results are available and then return them.

Parameters:
  • task_id (int) – ID of the task for which the results should be obtained.

  • interval (float) – Interval in seconds to wait between checking server for results.

Returns:

List of task results.

Return type:

list

8.6.2. Algorithm tools#

vantage6.tools.wrappers#

This module contains algorithm wrappers. These wrappers are used to provide different data adapters to the algorithms. This way we ony need to write the algorithm once and can use it with different data adapters.

Currently the following wrappers are available:
  • DockerWrapper (= CSVWrapper)

  • SparqlDockerWrapper

  • ParquetWrapper

  • SQLWrapper

  • ExcelWrapper

When writing the Docker file for the algorithm, the correct wrapper will automatically be selected based on the database type. The database type is set by the vantage6 node based on its configuration file.

class DatabaseType(value)#

Bases: str, Enum

Enum for the different database types.

Variables:
  • CSV (str) – CSV database

  • SQL (str) – SQL database

  • EXCEL (str) – Excel database

  • SPARQL (str) – SparQL database

  • PARQUET (str) – Parquet database

get_column_names(database_uri, db_type=None, query=None, sheet_name=None)#

Get the column names of dataframe that will be loaded into an algorithm

Parameters:
  • database_uri (str) – Path to the database file or URI of the database.

  • db_type (str) – The type of the database. This should be one of the CSV, SQL, Excel, Sparql or Parquet.

  • query (str) – The query to execute on the database. This is required for SQL and Sparql databases.

  • sheet_name (str) – The sheet name to read from the Excel file. This is optional and only for Excel databases.

Returns:

The column names of the dataframe

Return type:

list[str]

load_csv_data(database_uri)#

Load the local privacy-sensitive data from the database.

Parameters:

database_uri (str) – URI of the csv file, supplied by te node

Returns:

The data from the csv file

Return type:

pd.DataFrame

load_data(database_uri, db_type=None, query=None, sheet_name=None)#

Read data from database and give it back to the algorithm.

If the database type is unknown, this function will exit. Also, a ‘query’ is required for SQL and SparQL databases. If it is not present, this function will exit the algorithm.

Parameters:
  • database_uri (str) – Path to the database file or URI of the database.

  • db_type (str) – The type of the database. This should be one of the CSV, SQL, Excel, Sparql or Parquet.

  • query (str) – The query to execute on the database. This is required for SQL and Sparql databases.

  • sheet_name (str) – The sheet name to read from the Excel file. This is optional and only for Excel databases.

Returns:

The data from the database

Return type:

pd.DataFrame

load_excel_data(database_uri, sheet_name=None)#

Load the local privacy-sensitive data from the database.

Parameters:
  • database_uri (str) – URI of the excel file, supplied by te node

  • sheet_name (str | None) – Sheet name to be read from the excel file. If None, the first sheet will be read.

Returns:

The data from the excel file

Return type:

pd.DataFrame

load_parquet_data(database_uri)#

Load the local privacy-sensitive data from the database.

Parameters:

database_uri (str) – URI of the parquet file, supplied by te node

Returns:

The data from the parquet file

Return type:

pd.DataFrame

load_sparql_data(database_uri, query)#

Load the local privacy-sensitive data from the database.

Parameters:
  • database_uri (str) – URI of the triplestore, supplied by te node

  • query (str) – Query to retrieve the data from the triplestore

Returns:

The data from the triplestore

Return type:

pd.DataFrame

load_sql_data(database_uri, query)#

Load the local privacy-sensitive data from the database.

Parameters:
  • database_uri (str) – URI of the sql database, supplied by te node

  • query (str) – Query to retrieve the data from the database

Returns:

The data from the database

Return type:

pd.DataFrame

vantage6.tools.wrap#

load_input(input_file)#

Load the input from the input file.

Parameters:

input_file (str) – File containing the input

Returns:

input_data – Input data for the algorithm

Return type:

Any

Raises:

DeserializationError – Failed to deserialize input data

wrap_algorithm(log_traceback=True)#

Wrap an algorithm module to provide input and output handling for the vantage6 infrastructure.

Data is received in the form of files, whose location should be specified in the following environment variables:

  • INPUT_FILE: input arguments for the algorithm. This file should be encoded in JSON format.

  • OUTPUT_FILE: location where the results of the algorithm should be stored

  • TOKEN_FILE: access token for the vantage6 server REST api

  • USER_REQUESTED_DATABASE_LABELS: comma-separated list of database labels that the user requested

  • <DB_LABEL>_DATABASE_URI: uri of the each of the databases that the user requested, where <DB_LABEL> is the label of the database given in USER_REQUESTED_DATABASE_LABELS.

The wrapper expects the input file to be a json file. Any other file format will result in an error.

Parameters:
  • module (str) – Python module name of the algorithm to wrap.

  • log_traceback (bool) – Whether to print the full error message from algorithms or not, by default False. Algorithm developers should set this to False if the error messages may contain sensitive information. By default True.

Return type:

None

vantage6.tools.mock_client#

class MockAlgorithmClient(datasets, module, collaboration_id=None, organization_ids=None, node_ids=None)#

The MockAlgorithmClient mimics the behaviour of the AlgorithmClient. It can be used to mock the behaviour of the AlgorithmClient and its communication with the server.

Parameters:
  • datasets (list[list[dict]]) –

    A list that contains the datasets that are used in the mocked algorithm. The inner list contains the datasets for each organization; the outer list is for each organization. A single dataset should be described as a dictionary with the same keys as in a node configuration:

    • database: str (path to file or SQL connection string) or pd.DataFrame

    • db_type (str, e.g. “csv” or “sql”)

    There are also a number of keys that are optional but may be required depending on the database type: - query: str (required for SQL/Sparql databases) - sheet_name: str (optional for Excel databases) - preprocessing: dict (optional, see the documentation for

    preprocessing for more information)

    Note that if the database is a pandas DataFrame, the type and input_data keys are not required.

  • module (str) – The name of the module that contains the algorithm.

  • collaboration_id (int, optional) – Sets the mocked collaboration id to this value. Defaults to 1.

  • organization_ids (list[int], optional) – Set the organization ids to this value. The first value is used for this organization, the rest for child tasks. Defaults to [0, 1, 2, ..].

  • node_ids (list[int], optional) – Set the node ids to this value. The first value is used for this node, the rest for child tasks. Defaults to [0, 1, 2, …].

class Collaboration(parent)#

Collaboration subclient for the MockAlgorithmClient

get(is_encrypted=True)#

Get mocked collaboration

Parameters:

is_encrypted (bool) – Whether the collaboration is encrypted or not. Default True.

Returns:

A mocked collaboration.

Return type:

dict

class Node(parent)#

Node subclient for the MockAlgorithmClient

get(is_online=True)#

Get mocked node

Parameters:

is_online (bool) – Whether the node is online or not. Default True.

Returns:

A mocked node.

Return type:

dict

class Organization(parent)#

Organization subclient for the MockAlgorithmClient

get(id_)#

Get mocked organization by ID

Parameters:

id (int) – The id of the organization.

Returns:

A mocked organization.

Return type:

dict

list()#

Get mocked organizations in the collaboration.

Returns:

A list of mocked organizations in the collaboration.

Return type:

list[dict]

class Result(parent)#

Result subclient for the MockAlgorithmClient

from_task(task_id)#

Return the results of the task with the given id.

Parameters:

task_id (int) – The id of the task.

Returns:

The results of the task.

Return type:

list[Any]

get(id_)#

Get mocked result by ID

Parameters:

id (int) – The id of the result.

Returns:

A mocked result.

Return type:

Any

class Run(parent)#

Run subclient for the MockAlgorithmClient

from_task(task_id)#

Get mocked runs by task ID

Parameters:

task_id (int) – The id of the task.

Returns:

A list of mocked runs.

Return type:

list[dict]

get(id_)#

Get mocked run by ID

Parameters:

id (int) – The id of the run.

Returns:

A mocked run.

Return type:

dict

class SubClient(parent)#

Create sub groups of commands using this SubClient

Parameters:

parent (MockAlgorithmClient) – The parent client

class Task(parent)#

Task subclient for the MockAlgorithmClient

create(input_, organizations, name='mock', description='mock')#

Create a new task with the MockProtocol and return the task id.

Parameters:
  • input (dict) – The input data that is passed to the algorithm. This should at least contain the key ‘method’ which is the name of the method that should be called. Other keys depend on the algorithm.

  • organizations (list[int]) – A list of organization ids that should run the algorithm.

  • name (str, optional) – The name of the task, by default “mock”

  • description (str, optional) – The description of the task, by default “mock”

Returns:

A dictionary with information on the created task.

Return type:

task

get(task_id)#

Return the task with the given id.

Parameters:

task_id (int) – The id of the task.

Returns:

The task details.

Return type:

dict

wait_for_results(task_id, interval=1)#

Mock waiting for results - just return the results as tasks are completed synchronously in the mock client.

Parameters:
  • task_id (int) – ID of the task for which the results should be obtained.

  • interval (float) – Interval in seconds between checking for new results. This is ignored in the mock client but included to match the signature of the AlgorithmClient.

Returns:

List of task results.

Return type:

list

vantage6.tools.util#

error(msg)#

Print an error message to stdout.

Parameters:

msg (str) – Error message to be printed

Return type:

None

get_env_var(var_name, default=None)#

Get the value of an environment variable. Environment variables are encoded by the node so they need to be decoded here.

Note that this decoding follows the reverse of the encoding in the node: first replace ‘=’ back and then decode the base32 string.

Parameters:
  • var_name (str) – Name of the environment variable

  • default (str | None) – Default value to return if the environment variable is not found

Returns:

var_value – Value of the environment variable, or default value if not found

Return type:

str | None

info(msg)#

Print an info message to stdout.

Parameters:

msg (str) – Message to be printed

Return type:

None

warn(msg)#

Print a warning message to stdout.

Parameters:

msg (str) – Warning message to be printed

Return type:

None