Getting started

This tutorial explains how to implement an executor in Python. This also provides insight into the internal workings of ColonyOS making it easier to the utilize pre-developed executors, such as various container executors.

This tutorial describes how to implement an executor in Python. This also understanding how ColonyOS works internally when using already developed executors, such as different container executors.

First, follow instructions at Getting started and install the colonies binary.

Starting a Colonies server

First, follow instructions at Docker-compose. In short:

git clone https://github.com/colonyos/colonies.git
source docker-compose.env
docker-compose up

Installing Python SDK

Note that the SDK has only be tested on Linux and MacOS.

pip3 install pycolonies

Connecting to a Colonies server

The code below configures a Python client to connect to a Colonies server running at localhost:50080.

from pycolonies import Colonies

colonies = Colonies("localhost", 50080)

To be able to use the client and Python SDK, the following settings is required:

  • A name of colony.

  • A valid private key of a user or executor member of the colony.

  • Optionally, a colony private key to register new executors or users.

An alternative method is to use the colonies_client() function. It will parse the following environmental variables:

  • COLONIES_SERVER_HOST

  • COLONIES_SERVER_PORT

  • COLONIES_SERVER_TLS

  • COLONIES_COLONY_NAME

  • COLONIES_EXECUTOR_NAME

  • COLONIES_PRVKEY

  • COLONIES_COLONY_PRVKEY

Just make sure to source docker-compose.env before starting the Python script.

from pycolonies import colonies_client

colonies, colonyname, colony_prvkey, executorname, prvkey = colonies_client()

Registering an executor

The first step to register an executor is to create a new ECDSA key pair.

from pycolonies import Crypto

crypto = Crypto()
executor_prvkey = crypto.prvkey()
executorname = crypto.id(executor_prvkey)

The executor_id is essentially a SHA3-256 hash of the public key, which itself is generated from the private key. The Colonies server derives the public key from message signatures to reconstruct the executor_id. It then searches for the executor_id in its internal database to verify whether the executor is an authorized member of the colony. The executor_id must therefore be specified when registering a new executor. Additionally, only the colony owner has the authority to add an executor. Consequently, the add_executor() function requires the colony’s private key as an argument.

Note that the process of adding an executor in done in two steps. Once an executor is added, it need to be approved by calling approve_executor() function in order to get process assignments.

executor = {
    "executorname": executorname,
        "executorid": executorid,
        "colonyname": colonyname,
        "executortype": "helloworld-executor"
    }

try:
    executor = self.colonies.add_executor(executor, colony_prvkey)
    self.colonies.approve_executor(self.colonyname, executorname, colony_prvkey)

    self.colonies.add_function(colonyname,
                               executorname,
                               "helloworld",
                               executor_prvkey)
except Exception as err:
    print(err)
    os._exit(0)

print("Executor", executorname, "registered")

Process assignments

An executor can get a process assignments by calling the assign() function. The function takes executor_prvkey as argument and a timeout how long time the call should block, in this case 10 seconds.

process = colonies.assign(self.colonyname, 10, self.executor_prvkey)

The process object contains the following information:

{'processid': '9a580b18c9a6cb9716097ff02dd54b4bb18177e8241fafc1f5c919e421b5b586',
 'initiatorid': '3fc05cf3df4b494e95d6a3d297a34f19938f7daa7422ab0d4f794454133341ac',
 'initiatorname': 'myuser',
 'assignedexecutorid': '50900712cb50da8e14ef45aabb037c56a94264a6948da71344228645f2267a21',
 'isassigned': True,
 'state': 1,
 'prioritytime': 1703010866089369404,
 'submissiontime': '2023-12-19T19:34:26.089369+01:00',
 'starttime': '2023-12-19T18:35:43.513739591Z',
 'endtime': '0001-01-01T00:53:28+00:53',
 'waitdeadline': '0001-01-01T00:53:28+00:53',
 'execdeadline': '0001-01-01T00:53:28+00:53',
 'retries': 0,
 'attributes': [],
 'spec': {'nodename': '', 'funcname': 'helloworld', 'args': [], 'kwargs': {}, 'priority': 0, 'maxwaittime': -1, 'maxexectime': -1, 'maxretries': 0, 'conditions': {'colonyname': 'dev', 'executorids': [], 'executortype': 'helloworld-executor', 'dependencies': [], 'nodes': 0, 'cpu': '', 'processes': 0, 'processespernode': 0, 'mem': '', 'storage': '', 'gpu': {'name': '', 'mem': '', 'count': 0, 'nodecount': 0}, 'walltime': 0}, 'label': '', 'fs': {'mount': '', 'snapshots': None, 'dirs': None}, 'env': {}},
 'waitforparents': False,
 'parents': [],
 'children': [],
 'processgraphid': '',
 'in': [],
 'out': [],
 'errors': []}

Once a process is assigned to an executor, that executor gains exclusive access to it. Each process can be assigned to only one executor, and only the executor assigned to a specific process has the right to add logs, attributes or close it. The following code demonstrates how to close an assigned process with the output helloworld.

if process["spec"]["funcname"] == "helloworld":
    self.colonies.close(process["processid"], ["helloworld"], executor_prvkey)

Complete example

from pycolonies import Crypto
from pycolonies import Colonies
from pycolonies import colonies_client
import signal
import os
import uuid

class PythonExecutor:
    def __init__(self):
        colonies, colonyname, colony_prvkey, executorname, prvkey = colonies_client()
        self.colonies = colonies
        self.colonyname = colonyname
        self.colony_prvkey = colony_prvkey
        self.executorname = "helloworld-executor"
        self.executortype = "helloworld-executor"

        crypto = Crypto()
        self.executor_prvkey = crypto.prvkey()
        self.executorid = crypto.id(self.executor_prvkey)

        self.register()

    def register(self):
        executor = {
            "executorname": self.executorname,
            "executorid": self.executorid,
            "colonyname": self.colonyname,
            "executortype": self.executortype
        }

        try:
            executor = self.colonies.add_executor(executor, self.colony_prvkey)
            self.colonies.approve_executor(self.colonyname, self.executorname, self.colony_prvkey)

            self.colonies.add_function(self.executorname,
                                       self.colonyname,
                                       "helloworld",
                                       self.executor_prvkey)
        except Exception as err:
            print(err)
            os._exit(0)

        print("Executor", self.executorname, "registered")

    def start(self):
        while (True):
            try:
                process = self.colonies.assign(self.colonyname, 10, self.executor_prvkey)
                print("Process", process["processid"], "is assigned to executor")
                if process["spec"]["funcname"] == "helloworld":
                    self.colonies.close(process["processid"], ["helloworld"], self.executor_prvkey)
            except Exception as err:
                print(err)
                pass

    def unregister(self):
        self.colonies.remove_executor(self.colonyname, self.executorname, self.colony_prvkey)
        print("Executor", self.executorname, "unregistered")
        os._exit(0)

def sigint_handler(signum, frame):
    executor.unregister()

if __name__ == '__main__':
    signal.signal(signal.SIGINT, sigint_handler)
    executor = PythonExecutor()
    executor.start()

Start the executor by typing:

python3 helloworld_executor.py
Executor fca266fa7a5ca88a60129f6d19f189ce6f8ba086ec9b06e7eebe9350bd777dc0 registered

To call the helloworld function we need to submit the following function specification:

{
    "conditions": {
        "executortype": "helloworld-executor"
    },
    "funcname": "helloworld"
}

Note that the executortype must match executor type of the executor, which in our case is helloworld-executor.

colonies function submit --spec helloworld.json

Below is an alternative method to call the function:

colonies function exec --func helloworld --targettype helloworld-executor

It is also possible to submit function specifications directly in Python. Save the Python code below to a new file called helloworld.py.

from pycolonies import FuncSpec, Conditions
from pycolonies import colonies_client

colonies, colonyname, colony_prvkey, executorname, prvkey = colonies_client()

func_spec = FuncSpec(
     funcname="helloworld",
     conditions = Conditions(
         colonyname=colonyname,
         executortype="helloworld-executor"
     ),
     maxexectime=100,
     maxretries=3
)

# submit the function spec to the colonies server
process = colonies.submit(func_spec, prvkey)
print("Process", process["processid"], "submitted")

# wait for the process to be executed
process = colonies.wait(process, 10, prvkey)
print(process["out"][0])
python3 helloworld.py
Process bacf4309da2f19db96e21c4ed16cda0b41e7045e6e81550d90d679725855ee71 submitted
helloworld

Logs

Logs can simply be added by calling the add_log() function. Note that only the assigned executor may add logs.

self.colonies.add_log(process["processid"], "Hello from executor\n", self.executor_prvkey)
colonies function submit --spec helloworld.json --follow
INFO[0000] Process submitted                             ProcessId=c107281fceed8c7636debecca996bc8f714ca2301087e99a26fb7b93d5e5b4f9
INFO[0000] Printing logs from process                    ProcessId=c107281fceed8c7636debecca996bc8f714ca2301087e99a26fb7b93d5e5b4f9
Hello from executor
INFO[0001] Process finished successfully                 ProcessId=c107281fceed8c7636debecca996bc8f714ca2301087e99a26fb7b93d5e5b4f9

Workflows

Workflows are simply function specification with dependencies. The Python code below will create a workflow with the following shape.

_images/workflow.png
from pycolonies import colonies_client
from pycolonies import Workflow, FuncSpec, Conditions
import copy

colonies, colonyname, colony_prvkey, executorid, executor_prvkey = colonies_client()

hello1_func_spec = FuncSpec(
     funcname="helloworld",
     nodename="hello1",
     conditions = Conditions(
         colonyname=colonyname,
         executortype="helloworld-executor"
     ),
     maxexectime=100,
     maxretries=3
 )

 hello2_func_spec = FuncSpec(
     funcname="helloworld",
     nodename="hello1",
     conditions = Conditions(
         colonyname=colonyname,
         executortype="helloworld-executor"
         dependencies=["hello1"]
     ),
     maxexectime=100,
     maxretries=3
 )

 wf = Workflow(colonyname=colonyname)
 wf.functionspecs.append(hello1_func_spec)
 wf.functionspecs.append(hello2_func_spec)

 processgraph = colonies.submit_workflow(wf, prvkey)
 print("Workflow", processgraph.processgraphid, "submitted")

The hello1 node must execute before the hello2 process can run. Note that each process may call a function on any executor part of the same colony. This feature enables implementation of cross-platform workflows that operate seamlessly across a continuum of executors

Excercises

Execution contraints

Modify the maxexectime attribute in the function specification to 20 seconds, and add an additional delay — using sleep(30) — in the executor prior to closing the process.

import time

def start(self):
    while (True):
        try:
            process = self.colonies.assign(self.colonyname, 10, self.executor_prvkey)

            time.sleep(30)  # the helloworld function now takes 30 seconds to complete

            print("Process", process["processid"], "is assigned to executor")
            if process["spec"]["funcname"] == "helloworld":
                self.colonies.close(process["processid"], ["helloworld"], self.executor_prvkey)
        except Exception as err:
            print(err)
            pass

In this case, the process will be unassigned from the executor and immediately reassigned to the same executor. This will be repated 3 times (maxretries) until the process is closed as failed.

Load balancing

Make it possible to start multiple executors by setting the executorname to a random number or making it possible to specify an executor name as an argument to the helloworld_executor.py script. Sumbit several function specifications and notice how the they become load balanced between the executors.