Create an operator for Applications
Before going through this walkthrough we assume you have checked out the DCH hello-world app from Walkthrough Application: Creating an application. From here, in order to get started we need to go through the prerequisites and be able to run the tests locally.
Prerequisites
Setup python
The DCH hello world application uses python. The most recent image for the operator uses version 3.9. Install python 3.9 for your platform. We do not have any opinion about how to install python, but a good place to start is https://docs.anaconda.com/miniconda/miniconda-install/ or https://www.python.org/downloads/.
Setup poetry
The example code uses poetry to manage dependencies. To get started with the dch hello world application you need to install poetry.
Install requirements
Once poetry is installed, you can use poetry to install the dependencies.
poetry install
Run tests
The hello world comes with some simple tests. Writing tests for your service logic is a good way to make sure your operator does what you expect before deploying the application.
You can find example of tests in the operator/tests folder in the hello world application repository.
To run tests, execute the following command:
poetry run pytest
Recommended operator architecture
The DCH application system is flexible on how the operator is built. The only requirement is that there is an entry point script with an annotation that shows what function to start the operator. However, we do have recommendations for how to setup your operator.
Use poetry as a package manager.
Decouple data input and output from operator logic.
Only read and write data in the entry point script.
Verify that input exists and are valid in the entry point.
Separate operator business logic.
Develop the business logic inside the src folder.
This makes the logic easier to test.
Allows the logic to be more portable.
Lets you run the operator logic locally more easily.
Write tests for the business logic.
Using whatever framework or library you prefer.
Write input and output tests for the entry point
These tests must use DCH/Senaps custom library code to mimic the runtime environment.
This should only be testing read/write/document validation and not the business logic.
Define config classes that maps to input documents.
Use type advice
Entry point script
This is the first interaction DCH has with the operator. You can name the script what you want, because you specify the script in the manifest file, see below. We suggest placing it on the root path of the operator folder, named [operator_name]_entry.py. The entry script in our example is found here operator/operator_entry.py.
Annotation to specify entry function
The entry script specifies an annotation, @model("hello_world"), to instruct the DCH infrastructure what function to execute first. The string in the annotation must match the operator id used in the recipe file and the manifest file.
@model("hello_world")
def operator_entrypoint(context):
# Code to manage inputs and outputs
To decouple the operator code and the infrastructure code, we suggest that the entry script handles all the inputs and outputs of the application, delegating all the business logic to separate modules.
Read input
This section describes how to access and read inputs. The operator is deployed with access to document nodes and data streams that you can use in the operator.
Supported inputs
The DCH application operator can get access to:
Single Data Stream
Corresponds to STREAM node type in the recipe file.
Multiple Data Streams - This is a collection of data streams.
Corresponds to STREAM_COLLECTION node type in the recipe file
Document - A document node contains a free-text string value, less than 16 MB.
Use with the DOCUMENT node type in the recipe file
Use with the MODEL_METADATA node type in the recipe file
Use with the BRIQL_RESULT node type in the recipe file
Read string from document
You can use the document data client to read value of a document port.
raw_document_data = StringIO(get_document_port_value(context, Input.tariff_data))
Read json from document
You can use the document data client to read value of a document port as JSON.
configuration_json = get_document_port_json(context, Input.application_configuration)
Read single stream
You can use the port client to get the stream id and the stream data client to read a stream.
from data_clients.stream import get_data_from_stream
from data_clients.ports import get_stream_id_from_port
stream_id = get_stream_id_from_port(context, Input.energy)
# Reading all the data from a single stream (max 500_000 rows)
stream_data: pandas.DataFrame = get_data_from_stream(context, stream_id)
Read collection of streams
You can use the port client to get the stream ids and the stream data client to read a collection of streams.
from data_clients.stream import get_data_from_streams
from data_clients.ports import get_stream_collection_ids
energy_stream_collection_ids = get_stream_collection_ids(context, Input.energy_collection)
# Reading the data from all the streams in a collection (max 500_000 rows)
energy_stream_collection_data = get_data_from_streams(context, energy_stream_collection_ids)
Write output
This section describes how to write to outputs. The operator is deployed with access to document nodes and data streams that you can write to from the operator.
Supported outputs
The DCH application operator can write data to:
A Single Data Stream
Corresponds to STREAM node type in the recipe file.
Document - A document node in any format, less than 16 MB.
Use with the DOCUMENT node type in the recipe file
Write string to document
You can use the port client to get the stream ids and the stream data client to read a collection of streams.
from data_clients.stream import get_data_from_streams
from data_clients.ports import get_stream_collection_ids
energy_stream_collection_ids = get_stream_collection_ids(context, Input.energy_collection)
# Reading the data from all the streams in a collection (max 500_000 rows)
energy_stream_collection_data = get_data_from_streams(context, energy_stream_collection_ids)
Write Json to document
You can use the document data client to write the value to a document port. The function is write_to_document(context, value, port_id).
from data_clients.document import write_to_document
# Write to document
result = {
"hello": hello
}
write_to_document(context, result, Output.application_results)
Write single stream
You can use the port client to get the stream id and the stream data client to write to a stream. The function is write_observations_series_to_stream_id(context, pandas.DataFrame, stream_id)
from data_clients.stream import write_observations_series_to_stream_id
from data_clients.ports import get_stream_id_from_port
output_stream_id = get_stream_id_from_port(context, Output.energy_stream)
fake_data = generate_data_for_past_year(data_column_name=Output.energy_stream)
write_observations_series_to_stream_id(context, fake_data[Output.energy_stream], output_stream_id)
Operator Manifest
The operator needs the manifest file manifest.json and the operator manifest describes the data that the operator is expecting and is custom to the deployment platform for the code. In the manifest file you need to set the entrypoint for the operator, the dependencies and base image. The manifest file also has to specify the ports the operator are connected to.
Chose operator base image
Set the base image. You should select the image based on the python version your operator is written for. Ensure you chose the image that has the correct pandas version. See the list below to find the image you want to use.
Python 3.9 - With TensorFlow
PYTHON3
pandas python 3 python 3.9 numpy tensorflow 2.14.0
Set entry point
The entry point needs to specify the entry point script referred to earlier in this walkthrough.
"entrypoint": "operator_entry.py"
Add dependencies
The manifest file has to specify the libraries you need to add when the operator image is generated. You can specify the provider as PIP or APT and the name of the package with the version specification following the pattern of the provider.
"dependencies": [
{"provider":"PIP","name":"faker==26.0.0"}
]
Specify the operator ports
The Manifest file contains a list of models. Each model specifies an operator. The id must match the operator id and version used in the recipe file. In addition you must specify the ports used in the operator. Each port must have a name, which must match the name used in the recipe file and direction (input or output) and the port type (document, stream or stream[]). The required field must be added, but should be set to false and is handled in the BRIQL query.
"models": [
{
"id": "hello_world",
"version": "0.0.13",
"ports": [
{
"portName": "application-configuration",
"required": false,
"type": "document",
"direction": "input"
},
{
"portName": "energy",
"required": false,
"direction": "input",
"type": "stream"
},
{
"portName": "energy_collection",
"required": false,
"direction": "input",
"type": "stream[]"
},
{
"portName": "energy-out",
"required": false,
"direction": "output",
"type": "stream"
},
{
"portName": "application-results",
"required": false,
"type": "document",
"direction": "output"
},
]
}
]
Writing multiple operators
There are a few different ways to write multiple operators.
In your entry point script have multiple entry point functions and map the annotation to different "models" in the manifest file. These in turn must match the operator specified in the recipe file.
Copy the example operator folder and develop your other operators. The DCH application system supports bundling multiple operators with you application.
Use a separate operator in the recipe file. You can upload an operator in another context and still use it in your application recipe file.
Writing entry point tests (Coming soon)
The hello world application demo does not have any tests for the entry point yet, but it will be added soon.
Last updated