Examples

This section contains code examples. Most of them require specific Mesh model on the server side. Best practice is to copy the examples scripts that are of interest for you together with helpers.py

Best practices

  1. Checkout or open in GitHub git tag corresponding to the Mesh Python SDK version you are using. E.g. for Mesh Python SDK v1.7 it is: https://github.com/Volue-Public/energy-mesh-python/tree/v1.7.0

  2. Copy either the whole examples directory or specific examples script(s) together with helpers.py and paste it to your own workspace.

  3. Run example:

    python .\examples\get_version.py localhost:50051 c:\certificate.pem
    

    First argument - Mesh server address with port. Default value is localhost:50051.

    Second argument - path to PEM-encoded TLS certificate used by Mesh server. Skip it if the Mesh server is configured to accept insecure gRPC connections. If provided, then make sure that instead of volue.mesh.Connection.Session.insecure(), the volue.mesh.Connection.Session.with_tls() is used to establish connection to Mesh.

Note

Starting from Mesh Python SDK 1.9, in all examples the connection to Mesh server is established using volue.mesh.Connection.Session.insecure(). To use a different connection type, e.g.: with TLS, the user has to change the example script. The PEM-encoded TLS certificate passed as a second argument will be discarded if an insecure connection is used.

Quickstart

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """Showing the quickest way to get started."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Which version is the server running
    version_info = connection.get_version()
    print(f"Connected to {version_info.name} {version_info.version}")

    # Create a remote session on the Volue Mesh server
    session = connection.create_session()
    session.open()
    print("You have now an open session and can request time series")

    # Close the remote session
    session.close()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Authorization

import asyncio

import helpers

import volue.mesh.aio
from volue import mesh


def sync_auth(address, tls_root_pem_cert, service_principal, user_principal):
    print("Synchronous authentication example: ")

    # Providing `root_certificates` is optional. If set to `None` root
    # certificates will be retrieved from a default location chosen by the gRPC
    # runtime.
    connection = mesh.Connection.with_kerberos(
        address, tls_root_pem_cert, service_principal, user_principal
    )

    user_identity = connection.get_user_identity()
    print(user_identity)

    # revoke no longer used token
    connection.revoke_access_token()


async def async_auth(address, tls_root_pem_cert, service_principal, user_principal):
    print("Asynchronous authentication example:")

    # Providing `root_certificates` is optional. If set to `None` root
    # certificates will be retrieved from a default location chosen by the gRPC
    # runtime.
    connection = mesh.aio.Connection.with_kerberos(
        address, tls_root_pem_cert, service_principal, user_principal
    )

    user_identity = await connection.get_user_identity()
    print(user_identity)

    # revoke no longer used token
    await connection.revoke_access_token()


def main(address, tls_root_pem_cert):
    """Showing how to authorize to gRPC Mesh server."""

    # If Mesh gRPC server is running as a service user,
    # for example LocalSystem, NetworkService or a user account
    # with a registered service principal name then it is enough
    # to provide hostname as service principal, e.g.:
    #   'HOST/hostname.companyad.company.com'
    # If Mesh gRPC server is running as a user account without
    # registered service principal name then it is enough to provide
    # user account name running Mesh server as service principal, e.g.:
    #   'ad\\user.name' or r'ad\user.name'
    # Note: winkerberos converts service principal name if provided in
    #       RFC-2078 format. '@' is converted to '/' if there is no '/'
    #       character in the service principal name. E.g.:
    #           service@hostname
    #       Would be converted to:
    #           service/hostname
    service_principal = "HOST/hostname.companyad.company.com"
    user_principal = None

    sync_auth(address, tls_root_pem_cert, service_principal, user_principal)
    asyncio.run(
        async_auth(address, tls_root_pem_cert, service_principal, user_principal)
    )


if __name__ == "__main__":
    # This will authenticate Python client (receive authorization token from Mesh),
    # then send gRPC request that requires authorization (e.g.: GetUserIdentity)
    # and print the result. If your user name info is printed, you have successfully
    # communicated with the server.
    #
    # This requires Mesh server to be running with enabled TLS and Kerberos options.

    address, tls_root_pem_cert = helpers.get_connection_info()
#    main(address, tls_root_pem_cert)

Connect

import helpers

from volue.mesh import Connection


def get_version(connection):
    """Showing how to send get the server version."""
    print("1. Requesting server version")
    version = connection.get_version()
    print(f"2. Server version is {version.version}")


def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    session.open()
    print("B. Ending session")
    session.close()


def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks sequentially."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    get_version(connection)
    start_and_end_session(connection.create_session())


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)
    print("Done")

# Outputs:
# 1. Requesting server version
# 2. Server version is 1.12.5.0-dev
# A. Starting session
# B. Ending session
# Done

Connect, asynchronously

import asyncio

import helpers

from volue.mesh.aio import Connection


async def get_version(connection):
    """Showing how to get the server version."""
    print("1. Requesting server version")
    version = await connection.get_version()
    print(f"2. Server version is {version.version}")


async def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    await session.open()
    print("B. Ending session")
    await session.close()


async def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks concurrently."""
    # Creating a connection, but not sending any requests yet.

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Indicate that these two functions can be run concurrently.
    await asyncio.gather(
        get_version(connection), start_and_end_session(connection.create_session())
    )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))
    print("Done")

# Outputs:
# 1. Requesting server version
# A. Starting session
# 2. Server version is 1.12.5.0-dev
# B. Ending session
# Done

Connect using external access token

from datetime import datetime

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """
    Showing how to authorize to gRPC Mesh server using externally obtained
    access token, e.g: a OAuth JWT. Obtaining the access token is out of scope
    for this example.

    Depending on your environment, e.g.: Microsoft Entra ID, using libraries
    like Microsoft Authentication Library (MSAL) for getting the tokens is
    suggested.
    """

    token = "my_token"

    # Providing `root_certificates` is optional. If set to `None` root
    # certificates will be retrieved from a default location chosen by the gRPC
    # runtime.
    connection = Connection.with_external_access_token(
        address, tls_root_pem_cert, access_token=token
    )

    with connection.create_session() as session:
        # Print user information contained in the access token.
        user_identity = connection.get_user_identity()
        print(user_identity)

        # Read some time series data.
        # This requires the user has time series read permissions.
        timeseries_key = 1388
        timeseries = session.read_timeseries_points(
            timeseries_key, datetime(2014, 1, 1), datetime(2015, 1, 1)
        )
        print(timeseries.arrow_table.to_pandas())

        # For long running sessions it may be necessary to refresh the access
        # token.
        # Other possibility would be to catch grpc.RpcError with status code
        # UNAUTHENTICATED and then get new access token and update it in the
        # Mesh connection using `update_external_access_token`.
        connection.update_external_access_token("my_new_access_token")


if __name__ == "__main__":
    # This requires Mesh server to be running with enabled TLS and OAuth options.
    # Obtaining access token is out of the scope for this example.

    address, tls_root_pem_cert = helpers.get_connection_info()
    # main(address, tls_root_pem_cert)

Get version

import asyncio

import helpers

import volue.mesh.aio
from volue import mesh


def sync_get_version(address, tls_root_pem_cert):
    print("Synchronous get version:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    version_info = connection.get_version()
    print(version_info.version)


async def async_get_version(address, tls_root_pem_cert):
    print("Asynchronous get version:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.aio.Connection.insecure(address)

    version_info = await connection.get_version()
    print(version_info.version)


if __name__ == "__main__":
    # This will request and print version info from the mesh server.
    # If some sensible version info is printed, you have successfully
    # communicated with the server.
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_get_version(address, tls_root_pem_cert)
    asyncio.run(async_get_version(address, tls_root_pem_cert))

Read time series

import uuid
from datetime import datetime

import helpers

from volue.mesh import Connection


def read_timeseries_points(session: Connection.Session):
    """Showing how to read time series points."""

    # Define the time series identifier, it can be:
    # - time series key of a physical time series
    # - path of a time series attribute that is connected to a physical time series
    # - ID of a time series attribute that is connected to a physical time series
    timeseries_key = 3
    timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
    timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")

    # Defining a time interval to read time series from.
    # If no time zone is provided then it will be treated as UTC.
    start = datetime(2016, 1, 1, 6, 0, 0)
    end = datetime(2016, 1, 1, 8, 0, 0)

    # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
    # In case of larger data volumes please send request data in chunks.
    # E.g.: call multiple times `read_timeseries_points` with shorter interval.

    # Send request to read time series based on time series key.
    timeseries = session.read_timeseries_points(
        target=timeseries_key, start_time=start, end_time=end
    )
    print(f"Read {timeseries.number_of_points} points using time series key.")
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute path.
    timeseries = session.read_timeseries_points(
        target=timeseries_attribute_path, start_time=start, end_time=end
    )
    print(
        f"Read {timeseries.number_of_points} points using time series attribute path."
    )
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute ID.
    # Attribute IDs are auto-generated when an object is created.
    # That is why we can't use any fixed ID in this example and the code is commented out.
    # timeseries = session.read_timeseries_points(
    #     target=timeseries_attribute_id, start_time=start, end_time=end
    # )
    # print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
    # print(timeseries.arrow_table.to_pandas())


def main(address, tls_root_pem_cert):
    """Showing how to get time series points."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        read_timeseries_points(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Read time series, asynchronously

import asyncio
import uuid
from datetime import datetime

import helpers

from volue.mesh.aio import Connection


async def read_timeseries_points_async(session: Connection.Session):
    """Showing how to read time series points."""

    # Define the time series identifier, it can be:
    # - time series key of a physical time series
    # - path of a time series attribute that is connected to a physical time series
    # - ID of a time series attribute that is connected to a physical time series
    timeseries_key = 3
    timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
    timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")

    # Defining a time interval to read time series from.
    # If no time zone is provided then it will be treated as UTC.
    start = datetime(2016, 1, 1, 6, 0, 0)
    end = datetime(2016, 1, 1, 8, 0, 0)

    # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
    # In case of larger data volumes please send request data in chunks.
    # E.g.: call multiple times `read_timeseries_points` with shorter interval.

    # Send request to read time series based on time series key.
    timeseries = await session.read_timeseries_points(
        target=timeseries_key, start_time=start, end_time=end
    )
    print(f"Read {timeseries.number_of_points} points using time series key.")
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute path.
    timeseries = await session.read_timeseries_points(
        target=timeseries_attribute_path, start_time=start, end_time=end
    )
    print(
        f"Read {timeseries.number_of_points} points using time series attribute path."
    )
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute ID.
    # Attribute IDs are auto-generated when an object is created.
    # That is why we can't use any fixed ID in this example and the code is commented out.
    # timeseries = await session.read_timeseries_points(
    #     target=timeseries_attribute_id, start_time=start, end_time=end
    # )
    # print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
    # print(timeseries.arrow_table.to_pandas())


async def main(address, tls_root_pem_cert):
    """Showing how to get time series points asynchronously."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    async with connection.create_session() as session:
        await read_timeseries_points_async(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))

Read and process time series, asynchronously

import asyncio
from datetime import datetime

import grpc
import helpers

from volue.mesh.aio import Connection


async def process_timeseries_values(arrow_table):
    # the processing can also be an async IO call
    # to e.g. separate service
    await asyncio.sleep(8)
    print(f"Processing completed - {len(arrow_table)} points were processed")


async def read_timeseries_points(session, path):
    start_time = datetime(2016, 5, 1)
    end_time = datetime(2016, 5, 4)

    # read operation can be a long running operation
    # with asyncio API we can switch to do something else
    # while waiting for the read operation to complete
    # (e.g. doing processing for already returned time series)
    timeseries_read = await session.read_timeseries_points(
        target=path, start_time=start_time, end_time=end_time
    )

    return timeseries_read.arrow_table


async def handle_timeseries(session, path):
    arrow_table = await read_timeseries_points(session, path)
    await process_timeseries_values(arrow_table)


async def main(address, tls_root_pem_cert):
    """
    Showing how to use asynchronous connection in a real-world scenario.
    First multiple time series are returned that match a given query.
    Then each time series is read and some processing is applied.

    Note: Mesh does not yet handle parallel requests, every request is handled
          sequentially.

    This example works with `SimpleThermalTestModel` test model, where 2 time
    series are returned for the given query. Assume reading time series takes
    10 seconds and processing (or some different computation, e.g.
    neural network inference based on that input) takes 8 seconds.
    When waiting for the read operation to complete for the second time series
    we can already start processing the first time series. See:

    Tr = 10s (reading)
    Tp = 8s  (processing)

    Asynchronous code (with Mesh handling requests sequentially):
    |   Tr1  ||  Tp1 |
    ------------------
              |   Tr2  ||  Tp2 |
              ------------------
    ---------- 28s -------------

    Whereas for synchronous code:
    |   Tr1  ||  Tp1 |
    ------------------
                      |   Tr2  ||  Tp2 |
                      ------------------
    ---------------  36s ---------------

    For 2 time series we save ~22% time.
    For 10 time series it would be 40% time (108s instead of 180s).

    Note: the processing could be also done using async IO
          (e.g. requests send to different service).
    """

    query = "*.TsRawAtt"
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    async with connection.create_session() as session:
        try:
            timeseries_attributes = await session.search_for_timeseries_attributes(
                target=start_object_path, query=query
            )
        except grpc.RpcError as e:
            print(f"Could not find timeseries attribute: {e}")
            return

        print(f"Number of found time series: {len(timeseries_attributes)}")
        await asyncio.gather(
            *(
                handle_timeseries(session, timeseries_attribute.path)
                for timeseries_attribute in timeseries_attributes
            )
        )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))

Search for time series attributes

import helpers

from volue.mesh import Connection, OwnershipRelationAttribute, TimeseriesAttribute


def search_method_1(session: Connection.Session):
    """
    This method uses `search_for_timeseries_attributes` function.
    Wildcard search expression for attributes is not supported, meaning we
    can't get all time series attributes using simply "*" search expression.

    Useful e.g.: when searching for attributes with known names.
    """
    print("Search method 1")

    # Specify what you want to search for

    # The Mesh object to start searching from
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
    # OR
    # start_object_guid = uuid.UUID("0000000b-0001-0000-0000-000000000000")  # ThermalComponent

    # The query expressed using Mesh search language syntax
    # Traverse all children ({*}) of the start object and accept all that have
    # an attribute (.) called TsRawAtt
    query = "{*}.TsRawAtt"

    # Search for time series attributes using this query
    timeseries_attributes = session.search_for_timeseries_attributes(
        target=start_object_path, query=query
    )

    print(f"Number of found time series: {len(timeseries_attributes)}")
    for attribute in timeseries_attributes:
        if isinstance(attribute, TimeseriesAttribute):
            print(attribute.path)


def search_method_2(session: Connection.Session):
    """
    This method uses `search_for_objects` function with wildcard search
    expression ("*") that returns all objects.
    Useful when you want to traverse the complete model.

    Additionally here we show how to distinguish if time series attribute has
    a time series calculation expression defined or a physical time series
    connected to it.
    """
    print("Search method 2")

    # Provide root object (also referred to as a model) as the start object to
    # run the search.
    start_object_path = "Model/SimpleThermalTestModel"

    # Returns every object.
    query = "*"

    # Depending on the model size this may take long time to complete.
    objects = session.search_for_objects(target=start_object_path, query=query)

    print(f"Number of found objects: {len(objects)}")
    for object in objects:
        print(object.name)
        for attribute in object.attributes.values():
            if isinstance(attribute, TimeseriesAttribute):
                # If time series resource is set, then it means a physical
                # time series is connected to the attribute.
                if attribute.time_series_resource is not None:
                    print(
                        f"{attribute.path} has physical time series connected with time series key {attribute.time_series_resource.timeseries_key}"
                    )


def search_method_3(session: Connection.Session):
    """
    This method uses `get_object` function.
    Useful when you want to traverse a subset of the model tree from a given
    object.
    """
    print("Search method 3")

    def traverse_child_objects(session: Connection.Session, target):
        object = session.get_object(target, full_attribute_info=True)

        for attribute in object.attributes.values():
            if isinstance(attribute, OwnershipRelationAttribute):
                for child_id in attribute.target_object_ids:
                    traverse_child_objects(session, child_id)

            if isinstance(attribute, TimeseriesAttribute):
                print(attribute.path)

    object_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1"
    traverse_child_objects(session, object_path)


def main(address, tls_root_pem_cert):
    """Showing how to search for Mesh time series attributes in various ways."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Create a remote session on the Volue Mesh server.
    with connection.create_session() as session:
        search_method_1(session)
        search_method_2(session)
        search_method_3(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Traverse model

import helpers

from volue.mesh import Connection, OwnershipRelationAttribute

leaves = []


def traverse_model_top_down(session: Connection.Session, target, depth=0):
    """Traverses the Mesh model recursively."""
    object = session.get_object(target)
    print(f"{'..' * depth}{object.name}")
    leaf = True

    for attr in object.attributes.values():
        if isinstance(attr, OwnershipRelationAttribute):
            for child_id in attr.target_object_ids:
                leaf = False
                traverse_model_top_down(session, child_id, depth + 1)
    if leaf:
        leaves.append(object)


def traverse_model_bottom_up(session: Connection.Session, target, model):
    object = session.get_object(target)
    depth = object.path.count("/") - 1
    print(f"{'..' * depth}{object.name}")
    if object.owner_id == model.id:
        print(model.name)
        return
    attribute = session.get_attribute(object.owner_id)
    traverse_model_bottom_up(session, attribute.owner_id, model)


def main(address, tls_root_pem_cert):
    """Showing how to traverse Mesh model."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        models = session.list_models()
        for model in models:
            leaves.clear()
            print(f"\nModel: '{model.name}'")
            print("Top-bottom traversal:")
            traverse_model_top_down(session, model.id)
            # Excepted output:
            # Model
            # ..ChildObject1
            # ....SubChildObject1
            # ....SubChildObject2
            # ..ChildObject2
            print("\nBottom-top traversal:")
            traverse_model_bottom_up(session, leaves[0].id, model)
            # Excepted output:
            # ....SubChildObject1
            # ..ChildObject1
            # Model


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Using time series with pandas

from datetime import datetime

import grpc
import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz

from volue.mesh import Connection, Timeseries
from volue.mesh.calc import transform as Transform
from volue.mesh.calc.common import Timezone


def main(address, tls_root_pem_cert):
    """Showing how to find time series, write, read points from it and convert them to pandas format."""

    query = "*[.Name=SomePowerPlantChimney2].TsRawAtt"  # make sure only 1 time series is returned
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # first lets find a time series in our model
        try:
            timeseries_attributes = session.search_for_timeseries_attributes(
                start_object_path, query
            )
        except grpc.RpcError as e:
            print(f"Could not find time series attribute: {e}")
            return

        if len(timeseries_attributes) == 0:
            print("No such time series attribute in the given model/database")
            return

        print(f"Number of found time series: {len(timeseries_attributes)}")

        # pick the first time series and do some operations with it
        timeseries_attribute = timeseries_attributes[0]
        print("Working on timeseries with path: " + timeseries_attribute.path)

        # check for example the curve type of the connected physical time series
        print(f"Curve: {timeseries_attribute.time_series_resource.curve_type}")

        # now lets write some data to it
        try:
            # Mesh data is organized as an Arrow table with the following schema:
            # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
            # flags - [pa.uint32]
            # value - [pa.float64]

            number_of_points = 72
            timestamps = []
            values = []
            for i in range(0, number_of_points):
                hours = i % 24
                days = int(i / 24) + 1
                timestamps.append(
                    datetime(2016, 5, days, hours)
                )  # if no time zone is provided then the timestamp is treated as UTC
                values.append(days * 10)

            flags = [Timeseries.PointFlags.OK.value] * number_of_points

            arrays = [pa.array(timestamps), pa.array(flags), pa.array(values)]
            arrow_table = pa.Table.from_arrays(arrays, schema=Timeseries.schema)

            timeseries = Timeseries(
                table=arrow_table, full_name=timeseries_attribute.path
            )
            session.write_timeseries_points(timeseries)

        except grpc.RpcError as e:
            print(f"Could not write timeseries points: {e}")

        local_time_zone = tz.tzlocal()

        # now lets read from it
        try:
            # lets use local time zone (read from operating system settings)
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            timeseries_read = session.read_timeseries_points(
                target=timeseries_attribute, start_time=start_time, end_time=end_time
            )

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = timeseries_read.arrow_table.to_pandas()

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # notice that depending on the local time zone there is a shift in the data
            # e.g. for UTC+2 time zone, first 2 values will be NaN, because writing time series points in the previous step
            # is using time zone naive datetime object, so they are treated as UTC.

            # do some further processing

        except grpc.RpcError as e:
            print(f"Could not read timeseries points: {e}")

        # now lets read transformations from it (transform to days)
        print("Transform resolution to days:")
        try:
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            # Transform function may take optionally a time zone argument.
            # Refer to `transform` documentation for more details.
            # If you are using `LOCAL` or `STANDARD` time zone then make sure
            # the Mesh server is operating in the same time zone or adjust properly.
            transformed_timeseries = session.transform_functions(
                timeseries_attribute, start_time, end_time
            ).transform(Timeseries.Resolution.DAY, Transform.Method.SUM, Timezone.LOCAL)

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = transformed_timeseries.arrow_table.to_pandas()
            print(pandas_series)

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # do some further processing

        except Exception as e:
            print(f"Could not read transformed timeseries points: {e}")

        # optionally discard changes
        session.rollback()


if __name__ == "__main__":
    # This will search for a given time series, write some data,
    # read it and convert to pandas format.

    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Working with model (objects and attributes)

import helpers

from volue.mesh import AttributesFilter, Connection, OwnershipRelationAttribute


def main(address, tls_root_pem_cert):
    root_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # Root object has an ownership relation attribute that point to objects
        # of "PlantElementType" type. We want to add new object of this type.
        root_object = session.get_object(root_object_path, full_attribute_info=True)

        # First get the ownership attribute:
        # If we know the name of the attribute we can use:
        ownership_relation_attribute = root_object.attributes["ThermalPowerToPlantRef"]

        # If we don't know the name, but only the type of the target object
        # (the object it points to):
        # Note: This requires `full_attribute_info` flag set to True when
        #       calling `get_object` or `get_attribute`.
        for attribute in root_object.attributes.values():
            if (
                isinstance(attribute, OwnershipRelationAttribute)
                and attribute.definition.target_object_type_name == "PlantElementType"
            ):
                ownership_relation_attribute = attribute

        new_object = session.create_object(
            ownership_relation_attribute, "SomeNewPowerPlant"
        )
        print(f"New object created: {new_object.path}")

        int_attribute = new_object.attributes["Int64Att"]

        # Object returned by `create_object` contains all attributes with basic
        # information.
        print("One of the new object's attributes (basic information):")
        print(int_attribute)

        # Now let's update attribute's value.
        session.update_simple_attribute(int_attribute, 100)

        # Check updated value, but this time get also more information like
        # attribute definition. Use either:
        # - `get_object` with `full_attribute_info` flag
        # - `get_attribute` with `full_attribute_info` flag
        attribute_with_full_info = session.get_attribute(
            int_attribute, full_attribute_info=True
        )
        print("One of the new object's attributes (full information):")
        print(attribute_with_full_info)

        # We can also filter attributes by e.g. name.
        object_with_filtered_attributes = session.get_object(
            new_object,
            attributes_filter=AttributesFilter(name_mask=["DblAtt", "StringAtt"]),
        )
        print(
            f"Filtered attributes count: {len(object_with_filtered_attributes.attributes)}"
        )

        # Connect a time series resource to a time series attribute of the new object.
        new_timeseries_resource_key = 2
        session.update_timeseries_attribute(
            new_object.attributes["TsRawAtt"],
            new_timeseries_resource_key=new_timeseries_resource_key,
        )

        # Now disconnect the time series resource. Set `new_timeseries_resource_key`
        # to 0 to disconnect.
        session.update_timeseries_attribute(
            new_object.attributes["TsRawAtt"],
            new_timeseries_resource_key=0,
        )

        # Change local expression of a time series attribute of the new object.
        # The local expression is not validated during update operation.
        # Invalid local expressions will be accepted but will cause failures
        # when reading time series points later. Before committing changes,
        # make sure the expression is valid, by reading time series points and
        # verifying they are as expected.
        new_local_expression = "## = 2\n"
        session.update_timeseries_attribute(
            new_object.attributes["TsCalcAtt"],
            new_local_expression=new_local_expression,
        )

        # We can also update the template expression of a time series attribute definition,
        # which affects all attribute instances using that definition.
        ts_calc_attribute = session.get_timeseries_attribute(
            new_object.attributes["TsCalcAtt"], full_attribute_info=True
        )
        # The template expression is not validated during update operation.
        # Invalid template expressions will be accepted but will cause failures
        # when reading time series points later - if there is no local
        # expression set that takes precedence. In such case, before committing
        # changes, make sure the expression is valid, by reading time series
        # points and verifying they are as expected.
        new_template_expression = "## = 100\n"
        session.update_timeseries_attribute_definition(
            ts_calc_attribute.definition,
            new_template_expression=new_template_expression,
        )
        # Note: Updating the template expression affects ALL attributes that use
        # this definition across all objects in the model (unless they have a
        # local expression override).

        # Now lets change the object name.
        session.update_object(new_object, new_name="NewNamePowerPlant")
        print("Object's name changed.")

        # Delete the updated object.
        # Because we changed object's name, we can't provide old `Object`
        # instance as it is still having the old object name.
        # Use object's ID instead or provide the path to the object explicitly.
        session.delete_object(new_object.id)
        print("Object deleted.")

        # To commit your changes you need to call:
        # session.commit()


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Working with rating curves

from datetime import datetime

import helpers

from volue.mesh import Connection, RatingCurveSegment, RatingCurveVersion


def main(address, tls_root_pem_cert):
    rating_curve_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1.RatingCurveAtt"

    # Defining a time interval to read rating curve versions from.
    # If no time zone is provided then it will be treated as UTC.
    start_time = datetime(2008, 1, 1)
    end_time = datetime(2022, 1, 1)

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # First read the attribute using `get_attribute`.
        # We can get standard information like name, ID, tags, etc.
        rating_curve_attribute = session.get_attribute(
            rating_curve_attribute_path, full_attribute_info=True
        )
        print(
            f"Basic information about the rating curve attribute: {rating_curve_attribute}\n"
        )

        # Because the rating curve can potentially contain large amounts of data,
        # specialized methods exist to handle those values.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print(
            (
                "Rating curve versions for time interval: "
                f"{start_time.strftime('%d.%m.%Y')} - {end_time.strftime('%d.%m.%Y')}:"
            )
        )
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        if len(versions) == 0 or len(versions[-1].x_value_segments) == 0:
            print("No rating curve versions found. Skip update.")
            return

        # Now for the last version update first segment and add a new one.
        versions[-1].x_value_segments[0].factor_b = -3.2
        versions[-1].x_value_segments.append(RatingCurveSegment(25, -1.1, 2.2, -3.3))
        session.update_rating_curve_versions(
            target=rating_curve_attribute_path,
            start_time=versions[
                0
            ].valid_from_time,  # use first version `valid_from_time` as start interval
            end_time=datetime.max,  # this is the last version
            new_versions=versions,
        )

        # Read once again.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print("Updated rating curve versions:")
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        # Now lets replace last version with new one
        new_segments = [
            RatingCurveSegment(10, 1.1, 1.9, -3.5),
            RatingCurveSegment(21, 2.1, 1.8, -3.2),
        ]

        new_version = RatingCurveVersion(
            valid_from_time=datetime(2010, 6, 1),
            x_range_from=5.0,
            x_value_segments=new_segments,
        )

        session.update_rating_curve_versions(
            target=rating_curve_attribute_path,
            start_time=versions[
                -1
            ].valid_from_time,  # to replace old version use its `valid_from_time` as start interval
            end_time=datetime.max,  # this is the last version
            new_versions=[new_version],
        )

        # Read once again.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print("Updated for the second time rating curve versions:")
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        # to commit your changes you need to call:
        # session.commit()


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Working with sessions

import uuid

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """Showing different ways of working with sessions."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # 1. Create a session, open and close session.
    session = connection.create_session()
    session.open()
    print("1. You now have a new open session")
    session.close()

    # 2. Create session using the with statement.
    # Session will be created, opened and closed within the 'with' statement scope
    with connection.create_session() as session:
        print("2. You now have a new open session")

    # 3. Connecting to a potentially open session.
    # Session ids can be found in the session object:
    # session.session_id
    # Note: If the session_id is not the id of an open session,
    # the server will create a new one for you.
    # Set the session id you want to connect to
    session_id = uuid.UUID("123e4567-e89b-12d3-a456-556642440000")
    print(f"3. Session id you want to connect to: {session_id}")
    session = connection.connect_to_session(session_id)
    # Try connecting to that session id, if it does not exist, a new one will be created without warning
    session.open()
    print(
        "3. You have now an open session, either the one you requested or a new one if it did not exist"
    )
    # Check which session you are connected to and close it
    print(f"3. Session id you are to connect to: {session.session_id}")
    session.close()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Working with XY sets

import datetime

import helpers

from volue import mesh

OBJECT_PATH = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1"
UNVERSIONED_PATH = OBJECT_PATH + ".XYSetAtt"
VERSIONED_PATH = OBJECT_PATH + ".XYZSeriesAtt"


def main(address, tls_root_pem_cert):
    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    with connection.create_session() as session:
        # In the default test model both the versioned and the unversioned
        # attribute are initially empty. The following two calls are therefore
        # expected to return [].

        print(f"Getting XY-sets from {UNVERSIONED_PATH}")
        xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
        print(xy_sets)

        start_time = datetime.datetime.fromisoformat("1960-01-01")
        end_time = datetime.datetime.fromisoformat("2030-01-01")
        print(
            f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
        )
        xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
        print(f"Received: {xy_sets}")

        sample_xy_set = mesh.XySet(
            None, [mesh.XyCurve(0.0, [(1.0, 10.3), (2.5, 15.9)])]
        )

        print(f"Updating XY-set at {UNVERSIONED_PATH}")
        session.update_xy_sets(UNVERSIONED_PATH, new_xy_sets=[sample_xy_set])
        print("Updated")

        print(
            f"Replacing XY-set versions in interval [{start_time}, {end_time}) with one version at {start_time}"
        )
        sample_xy_set.valid_from_time = start_time
        session.update_xy_sets(VERSIONED_PATH, start_time, end_time, [sample_xy_set])
        print("Updated")

        print(f"Getting XY-sets from {UNVERSIONED_PATH}")
        xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
        print(f"Received: {xy_sets}")

        print(
            f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
        )
        xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
        print(f"Received: {xy_sets}")


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Write time series

import asyncio
import uuid
from datetime import datetime

import grpc
import helpers
import pandas as pd
import pyarrow as pa

import volue.mesh.aio
from volue import mesh

# Define the time series identifier, it can be:
# - time series key of a physical time series
# - path of a time series attribute that is connected to a physical time series
# - ID of a time series attribute that is connected to a physical time series
timeseries_key = 3
timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")

# time_zone = "Europe/Warsaw"
time_zone = "UTC"


def get_pa_table_from_pa_arrays() -> pa.Table:
    """Create a sample PyArrow Table with time series points from PyArrow arrays."""

    # Defining the data we want to write.
    # Mesh data is organized as an Arrow table with the following schema:
    # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
    # flags - [pa.uint32]
    # value - [pa.float64]

    number_of_points = 3

    timestamps = pd.date_range(
        datetime(2016, 1, 1, 1), periods=number_of_points, freq="1h", tz=time_zone
    )
    flags = [mesh.Timeseries.PointFlags.OK.value] * number_of_points
    values = [0.0, 10.0, 1000.0]

    arrays = [
        # if no time zone is provided then the timestamp is treated as UTC
        pa.array(timestamps),
        pa.array(flags),
        pa.array(values),
    ]
    return pa.Table.from_arrays(arrays, schema=mesh.Timeseries.schema)


def get_pa_table_from_pandas() -> pa.Table:
    """Create a sample PyArrow Table with time series points from pandas DataFrame."""

    # Defining the data we want to write.
    # Mesh data is organized as an Arrow table with the following schema:
    # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
    # flags - [pa.uint32]
    # value - [pa.float64]

    number_of_points = 3

    timestamps = pd.date_range(
        datetime(2016, 1, 1, 1), periods=number_of_points, freq="1h", tz=time_zone
    )
    flags = [mesh.Timeseries.PointFlags.OK.value] * number_of_points
    values = [1.1, 22.2, 333.3]

    df = pd.DataFrame(
        {
            mesh.Timeseries.TIMESTAMP_PA_FIELD_NAME: timestamps,
            mesh.Timeseries.FLAGS_PA_FIELD_NAME: flags,
            mesh.Timeseries.VALUE_PA_FIELD_NAME: values,
        }
    )

    return pa.Table.from_pandas(df, schema=mesh.Timeseries.schema)


def sync_write_timeseries_points(address, tls_root_pem_cert):
    print("Synchronous write time series points:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    with connection.create_session() as session:
        table = get_pa_table_from_pandas()

        # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
        # In case of larger data volumes please send input data in chunks.
        # E.g.: call multiple times `write_timeseries_points` with shorter interval.

        # Send request to write time series based on time series key.
        timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
        session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series key.")

        # To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
        empty_table = mesh.Timeseries.schema.empty_table()

        # If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
        # Because we are providing empty table we must specify them explicitly.
        # For this interval all existing points will be removed.
        #
        # End time is exclusive so from the 3 points written by timeseries key,
        # 2 points will be removed by this call.
        #
        # Send request to write time series based on time series attribute path/full_name.
        timeseries = mesh.Timeseries(
            table=empty_table,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
            full_name=timeseries_attribute_path,
        )
        session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series attribute path.")

        # Let's check it. We should get just one point.
        # Note:
        # - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
        # - the `end_time` in `read_timeseries_points` is also exclusive,
        #   but for time series with linear curve type Mesh is also returning one point after the interval.
        timeseries = session.read_timeseries_points(
            target=timeseries_key,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
        )
        print(f"Read {timeseries.number_of_points} points using time series key.")
        print(timeseries.arrow_table.to_pandas())

        # Send request to write time series based on time series attribute ID.
        # Attribute IDs are auto-generated when an object is created.
        # That is why we can't use any fixed ID in this example and the code will throw.
        try:
            timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
            session.write_timeseries_points(timeseries=timeseries)
            print("Time series points written using time series attribute ID.")
        except grpc.RpcError as e:
            print(
                f"failed to write time series points based on time series attribute ID: {e}"
            )
        # Commit the changes to the database.
        # session.commit()

        # Or discard changes.
        session.rollback()


async def async_write_timeseries_points(
    address,
    tls_root_pem_cert,
):
    print("Asynchronous write time series points:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.aio.Connection.insecure(address)

    async with connection.create_session() as session:
        table = get_pa_table_from_pa_arrays()

        # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
        # In case of larger data volumes please send input data in chunks.
        # E.g.: call multiple times `write_timeseries_points` with shorter interval.

        # Send request to write time series based on time series key.
        timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
        await session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series key.")

        # To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
        empty_table = mesh.Timeseries.schema.empty_table()

        # If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
        # Because we are providing empty table we must specify them explicitly.
        # For this interval all existing points will be removed.
        #
        # End time is exclusive so from the 3 points written by timeseries key,
        # 2 points will be removed by this call.
        #
        # Send request to write time series based on time series attribute path/full_name.
        timeseries = mesh.Timeseries(
            table=empty_table,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
            full_name=timeseries_attribute_path,
        )
        await session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series attribute path.")

        # Let's check it. We should get just one point.
        # Note:
        # - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
        # - the `end_time` in `read_timeseries_points` is also exclusive,
        #   but for time series with linear curve type Mesh is also returning one point after the interval.
        timeseries = await session.read_timeseries_points(
            target=timeseries_key,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
        )
        print(f"Read {timeseries.number_of_points} points using time series key.")
        print(timeseries.arrow_table.to_pandas())

        # Send request to write time series based on time series attribute ID.
        # Attribute IDs are auto-generated when an object is created.
        # That is why we can't use any fixed ID in this example and the code will throw.
        try:
            timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
            await session.write_timeseries_points(timeseries=timeseries)
            print("Time series points written using time series attribute ID.")
        except grpc.RpcError as e:
            print(
                f"failed to write time series points based on time series attribute ID: {e}"
            )

        # Commit the changes to the database.
        # await session.commit()

        # Or discard changes.
        await session.rollback()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_write_timeseries_points(
        address,
        tls_root_pem_cert,
    )
    asyncio.run(async_write_timeseries_points(address, tls_root_pem_cert))

Run simulations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


async def async_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            async for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_simulation(address, tls_root_pem_cert)
    asyncio.run(async_run_simulation(address, tls_root_pem_cert))

Run inflow calculations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


async def async_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            async for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_inflow_calculation(address, tls_root_pem_cert)
    asyncio.run(async_run_inflow_calculation(address, tls_root_pem_cert))

Working with availability events

from datetime import datetime

import dateutil
import helpers

from volue.mesh import Connection
from volue.mesh.availability import (
    EventType,
    Recurrence,
    RecurrenceType,
    RestrictionBasicRecurrence,
    RestrictionComplexRecurrence,
    Revision,
    TimePoint,
)

CHIMNEY_PATH = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2"


def revision_workflow(session: Connection.Session):
    """
    Demonstrates a complete workflow for creating, managing, and deleting revisions in Mesh.
    """
    print("\n=== Starting Revision Workflow Example ===\n")
    # 1. Create a revision
    print("1. Creating a new revision...")
    revision = session.availability.create_revision(
        target=CHIMNEY_PATH,
        event_id="event_id",
        local_id="local_id",
        reason="Revision reason",
    )

    print(f"   Created revision with ID: {revision.event_id}")
    print(f"   Owner ID: {revision.owner_id}")
    print(f"   Created by: {revision.created.author} at {revision.created.timestamp}")
    print(f"   Initial recurrences count: {len(revision.recurrences)}")

    # 2. Add a recurrence to the revision
    print("\n2. Adding a basic recurrence...")
    recurrence_id = session.availability.add_revision_recurrence(
        target=CHIMNEY_PATH,
        event_id=revision.event_id,
        period_start=datetime(2023, 1, 1, tzinfo=dateutil.tz.UTC),
        period_end=datetime(2023, 1, 2, tzinfo=dateutil.tz.UTC),
        recurrence=Recurrence(
            status="Planned",
            description="Recurrence",
            recurrence_type=RecurrenceType.NONE,
        ),
    )
    print(f"   Added recurrence with ID: {recurrence_id}")

    # 3. Get the revision
    print("\n3. Getting the revision with its new recurrence...")
    revision_with_recurrence = session.availability.get_availability_event(
        target=CHIMNEY_PATH,
        event_id=revision.event_id,
    )
    print(f"   Retrieved revision with ID: {revision_with_recurrence.event_id}")
    print(f"   Recurrences count: {len(revision_with_recurrence.recurrences)}")
    if revision_with_recurrence.recurrences:
        recurrence = revision_with_recurrence.recurrences[0]
        print(f"   Recurrence status: {recurrence.recurrence.status}")
        print(f"   Recurrence description: {recurrence.recurrence.description}")
        print(
            f"   Recurrence period: {recurrence.period_start} to {recurrence.period_end}"
        )

    # 4. Add another recurrence with a different pattern
    print("\n4. Adding a daily repeating recurrence...")
    second_recurrence_id = session.availability.add_revision_recurrence(
        target=CHIMNEY_PATH,
        event_id=revision.event_id,
        period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
        period_end=datetime(2023, 2, 2, tzinfo=dateutil.tz.UTC),
        recurrence=Recurrence(
            status="Planned",
            description="Second Recurrence",
            recurrence_type=RecurrenceType.DAILY,
            recur_every=2,
            recur_until=datetime(2023, 2, 15, tzinfo=dateutil.tz.UTC),
        ),
    )
    print(f"   Added second recurrence with ID: {second_recurrence_id}")

    # 5. Search for the revision using search_availability_events
    print("\n5. Searching for revisions...")
    search_results = session.availability.search_availability_events(
        event_type=EventType.REVISION,
        targets=[CHIMNEY_PATH],
    )
    print(f"   Found {len(search_results)} revision events")
    for i, result in enumerate(search_results):
        if isinstance(result, Revision):
            print(
                f"   Result {i+1}: Event ID: {result.event_id}, Reason: {result.reason}"
            )

    # 6. Search for instances of the revision
    print("\n6. Searching for specific instances of the revision...")
    instances = session.availability.search_instances(
        target=CHIMNEY_PATH,
        event_id="event_id",
        period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
        period_end=datetime(2023, 2, 15, tzinfo=dateutil.tz.UTC),
    )
    print(f"   Found {len(instances)} instances")
    # These are the actual occurrences based on the recurrence pattern
    for i, instance in enumerate(instances):
        print(
            f"   Instance {i+1}: Period: {instance.period_start} to {instance.period_end}"
        )

    # 7. Update the revision
    print("\n7. Updating the revision...")
    session.availability.update_revision(
        target=CHIMNEY_PATH,
        event_id="event_id",
        new_local_id="updated_local_id",
        new_reason="Updated reason",
    )

    updated_revision = session.availability.get_availability_event(
        target=CHIMNEY_PATH, event_id="event_id"
    )
    print(f"   Updated local ID: {updated_revision.local_id}")
    print(f"   Updated reason: {updated_revision.reason}")
    print(f"   Last modified: {updated_revision.last_changed.timestamp}")

    # 8. Delete a specific recurrence
    print("\n8. Deleting the second recurrence pattern...")
    session.availability.delete_revision_recurrence(
        target=CHIMNEY_PATH,
        event_id="event_id",
        recurrence_id=second_recurrence_id,
    )

    revision_after_delete = session.availability.get_availability_event(
        target=CHIMNEY_PATH, event_id="event_id"
    )
    print(f"   Recurrences remaining: {len(revision_after_delete.recurrences)}")

    # 9. Finally, delete the revision using delete_availability_events_by_id
    print("\n9. Deleting the entire revision...")
    session.availability.delete_availability_events_by_id(
        target=CHIMNEY_PATH,
        event_ids=["event_id"],
    )

    remaining_revisions = session.availability.search_availability_events(
        event_type=EventType.REVISION,
        targets=[CHIMNEY_PATH],
    )
    print(f"   Revisions found: {len(remaining_revisions)}")

    print("\n=== Revision Workflow Example Completed ===\n")


def restriction_workflow(session: Connection.Session):
    """
    Demonstrates a complete workflow for creating, managing, and deleting restrictions in Mesh.
    """
    print("\n=== Starting Restriction Workflow Example ===\n")

    # 1. Create a basic restriction with constant value
    print("1. Creating a basic restriction...")
    basic_restriction = session.availability.create_restriction(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
        local_id="basic_local_id",
        reason="basic restriction",
        category="DischargeMin[m3/s]",
        recurrence=RestrictionBasicRecurrence(
            recurrence=Recurrence(
                status="SelfImposed",
                description="Basic restriction",
                recurrence_type=RecurrenceType.WEEKLY,
                recur_every=1,
                recur_until=datetime(2023, 1, 31, tzinfo=dateutil.tz.UTC),
            ),
            period_start=datetime(2023, 1, 2, tzinfo=dateutil.tz.UTC),  # Monday
            period_end=datetime(2023, 1, 3, tzinfo=dateutil.tz.UTC),  # Tuesday
            value=75.5,  # 75.5% capacity
        ),
    )

    print(f"   Created basic restriction with ID: {basic_restriction.event_id}")
    print(f"   Owner ID: {basic_restriction.owner_id}")
    print(f"   Category: {basic_restriction.category}")
    print(f"   Value: {basic_restriction.recurrence.value}")
    print(
        f"   Created by: {basic_restriction.created.author} at {basic_restriction.created.timestamp}"
    )
    print(f"   Status: {basic_restriction.recurrence.recurrence.status}")

    # 2. Create a complex restriction with multiple time points
    print("\n2. Creating a complex restriction with multiple time points...")
    complex_restriction = session.availability.create_restriction(
        target=CHIMNEY_PATH,
        event_id="complex_restriction_id",
        local_id="complex_local_id",
        reason="Complex restriction",
        category="DischargeMax[m3/s]",
        recurrence=RestrictionComplexRecurrence(
            recurrence=Recurrence(
                status="SelfImposed",
                description="Complex restriction",
                recurrence_type=RecurrenceType.DAILY,
                recur_every=1,
                recur_until=datetime(2023, 1, 15, tzinfo=dateutil.tz.UTC),
            ),
            time_points=[
                TimePoint(
                    value=80.0,
                    timestamp=datetime(2023, 1, 1, 8, 0, tzinfo=dateutil.tz.UTC),
                ),
                TimePoint(
                    value=60.0,
                    timestamp=datetime(2023, 1, 1, 12, 0, tzinfo=dateutil.tz.UTC),
                ),
                TimePoint(
                    value=70.0,
                    timestamp=datetime(2023, 1, 1, 16, 0, tzinfo=dateutil.tz.UTC),
                ),
                TimePoint(
                    value=90.0,
                    timestamp=datetime(2023, 1, 1, 20, 0, tzinfo=dateutil.tz.UTC),
                ),
            ],
        ),
    )

    print(f"   Created complex restriction with ID: {complex_restriction.event_id}")
    print(f"   Category: {complex_restriction.category}")
    print(
        f"   Number of time points: {len(complex_restriction.recurrence.time_points)}"
    )
    print(
        f"   Recurrence type: {complex_restriction.recurrence.recurrence.recurrence_type.name}"
    )
    print(f"   Repeats until: {complex_restriction.recurrence.recurrence.recur_until}")

    # 3. Search for restrictions
    print("\n3. Searching for all restrictions...")
    restrictions = session.availability.search_availability_events(
        event_type=EventType.RESTRICTION,
        targets=[CHIMNEY_PATH],
    )
    print(f"   Found {len(restrictions)} restrictions")
    for i, restriction in enumerate(restrictions):
        print(
            f"   Restriction {i+1}: ID: {restriction.event_id}, Category: {restriction.category}"
        )

    # 4. Get a specific restriction by ID
    print("\n4. Getting specific restriction details...")
    retrieved_restriction = session.availability.get_availability_event(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
    )
    print(f"   Retrieved restriction with ID: {retrieved_restriction.event_id}")
    print(f"   Local ID: {retrieved_restriction.local_id}")
    print(f"   Reason: {retrieved_restriction.reason}")
    print(f"   Category: {retrieved_restriction.category}")

    # Check the type of recurrence and print the value accordingly
    # In this case we expect the recurrence to be of type RestrictionBasicRecurrence
    if isinstance(retrieved_restriction.recurrence, RestrictionBasicRecurrence):
        print(f"   Value: {retrieved_restriction.recurrence.value}")
    elif isinstance(retrieved_restriction.recurrence, RestrictionComplexRecurrence):
        # In case of complex recurrence we can print the time points
        for i, point in enumerate(retrieved_restriction.recurrence.values):
            print(
                f"   Time point {i+1}: Value: {point.value}, Timestamp: {point.timestamp}"
            )

    # 4.5 Get a specific restriction by ID
    print("\n4.5. Getting specific restriction details...")
    retrieved_restriction = session.availability.get_availability_event(
        target=CHIMNEY_PATH,
        event_id="complex_restriction_id",
    )
    print(f"   Retrieved restriction with ID: {retrieved_restriction.event_id}")
    print(f"   Local ID: {retrieved_restriction.local_id}")
    print(f"   Reason: {retrieved_restriction.reason}")
    print(f"   Category: {retrieved_restriction.category}")

    # Check the type of recurrence and print the value accordingly
    # In this case we expect the recurrence to be of type RestrictionComplexRecurrence
    if isinstance(retrieved_restriction.recurrence, RestrictionBasicRecurrence):
        print(f"   Value: {retrieved_restriction.recurrence.value}")
    elif isinstance(retrieved_restriction.recurrence, RestrictionComplexRecurrence):
        # In case of complex recurrence we can print the time points
        for i, point in enumerate(retrieved_restriction.recurrence.time_points):
            print(
                f"   Time point {i+1}: Value: {point.value}, Timestamp: {point.timestamp}"
            )

    # 5. Search for instances within a time period
    print("\n5. Searching for specific instances of the basic restriction...")
    instances = session.availability.search_instances(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
        period_start=datetime(2023, 1, 1, tzinfo=dateutil.tz.UTC),
        period_end=datetime(2023, 1, 31, tzinfo=dateutil.tz.UTC),
    )
    print(f"   Found {len(instances)} instances")
    for i, instance in enumerate(instances):  # Just show first few instances
        print(
            f"   Instance {i+1}: Period: {instance.period_start} to {instance.period_end}, Value: {instance.value}"
        )

    # 6. Update restriction
    print("\n6. Updating the basic restriction...")
    session.availability.update_restriction(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
        new_local_id="updated_basic_id",
        new_reason="Updated basic restriction reason",
        new_category="DischargeMax[m3/s]",
    )

    # Verify the update
    updated_restriction = session.availability.get_availability_event(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
    )
    print(f"   Updated local ID: {updated_restriction.local_id}")
    print(f"   Updated reason: {updated_restriction.reason}")
    print(f"   Updated category: {updated_restriction.category}")
    print(f"   Last modified: {updated_restriction.last_changed.timestamp}")

    # 7. Update restriction recurrence
    print("\n7. Updating the restriction recurrence...")
    new_recurrence = RestrictionBasicRecurrence(
        recurrence=Recurrence(
            status="SelfImposed",
            description="Updated restriction recurrence",
            recurrence_type=RecurrenceType.NONE,
        ),
        period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
        period_end=datetime(2023, 2, 10, tzinfo=dateutil.tz.UTC),
        value=50.0,
    )

    session.availability.update_restriction(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
        new_restriction_recurrence=new_recurrence,
    )

    updated_restriction = session.availability.get_availability_event(
        target=CHIMNEY_PATH,
        event_id="basic_restriction_id",
    )
    print(
        f"   New recurrence period: {updated_restriction.recurrence.period_start} to {updated_restriction.recurrence.period_end}"
    )
    print(f"   New value: {updated_restriction.recurrence.value}")
    print(
        f"   New recurrence type: {updated_restriction.recurrence.recurrence.recurrence_type.name}"
    )

    # 8. Delete restrictions
    print("\n8. Deleting restrictions...")
    session.availability.delete_availability_events_by_id(
        target=CHIMNEY_PATH,
        event_ids=["basic_restriction_id", "complex_restriction_id"],
    )

    # Verify deletion
    remaining_restrictions = session.availability.search_availability_events(
        event_type=EventType.RESTRICTION,
        targets=[CHIMNEY_PATH],
    )

    print(f"   Restrictions found: {len(remaining_restrictions)}")

    print("\n=== Restriction Workflow Example Completed ===\n")


def main(address, tls_root_pem_cert):
    """Showing how to create a revision."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        revision_workflow(session)
        restriction_workflow(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Check duplicated time series

#!/usr/bin/env python3

import uuid
from typing import Union

import helpers

from volue.mesh import Connection, TimeseriesAttribute


def get_pem_certificate_contents(certificate_path: str):
    """Reads the contents of a PEM certificate file."""

    tls_root_pem_cert = ""

    with open(certificate_path, "rb") as file:
        # In case multiple root certificates are needed, e.g.:
        # the same client accesses different Mesh servers (with different root certs)
        # Just combine into single file the root certificates, like:
        # -----BEGIN CERTIFICATE-----
        # ...(first certificate)...
        # -----END CERTIFICATE-----
        # -----BEGIN CERTIFICATE-----
        # ..(second certificate)..
        # -----END CERTIFICATE-----
        tls_root_pem_cert = file.read()

    return tls_root_pem_cert


def find_time_series_duplicates(
    session: Connection.Session, model: Union[str, uuid.UUID]
) -> dict[int, list[str]]:
    """
    Iterates over all Mesh model objects and stores time series key of physical
    or virtual time series connected to time series attributes.
    """
    time_series_info = {}

    for obj in session.search_for_objects(model, "{*}"):
        for attr in obj.attributes.values():
            if isinstance(attr, TimeseriesAttribute):
                if attr.time_series_resource is not None:
                    timeseries_key = attr.time_series_resource.timeseries_key
                    if timeseries_key not in time_series_info:
                        time_series_info[timeseries_key] = [attr.path]
                    else:
                        time_series_info[timeseries_key].append(attr.path)

    return time_series_info


def main(address, tls_root_pem_cert):
    """Checks for duplicated physical or virtual time series in a Mesh model."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    model_name = "SimpleThermalTestModel"

    with connection.create_session() as session:
        print(f"Model: '{model_name}'")
        time_series_info = find_time_series_duplicates(session, f"Model/{model_name}")

        for timeseries_key, paths in time_series_info.items():
            if len(paths) > 1:
                print(
                    f"Time series key {timeseries_key} is connected in {len(paths)} time series attributes:"
                )
                for path in paths:
                    print(f"  {path}")

    print("Check for duplicated time series done.")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Create physical time series

import helpers
import random
import string

from volue.mesh import Connection, Timeseries


def main(address, tls_root_pem_cert):
    """Showing how to create a physical time series resource."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # Mesh will throw an exception if we try to create a time series with an existing path
        # and name. Add a random suffix to the time series name to avoid this.
        name_suffix = get_random_suffix()

        result = session.create_physical_timeseries(
            path="/Path/To/Test/Timeseries/",
            name="Test_Timeseries_" + name_suffix,
            curve_type=Timeseries.Curve.PIECEWISELINEAR,
            resolution=Timeseries.Resolution.HOUR,
            unit_of_measurement="cm",
        )

        session.commit()

        print(result)


def get_random_suffix() -> str:
    random_chars = 10

    return "".join(
        random.choices(string.ascii_uppercase + string.digits, k=random_chars)
    )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Search calculation expressions

import helpers

from volue.mesh import Connection, OwnershipRelationAttribute, TimeseriesAttribute

keyword = "Income"
local_expressions = {}
template_expressions = {}


def add_expression_to_results(attr):
    """
    Checks if the attribute expression contains the keyword and adds it to the appropriate results dictionary.
    """

    if keyword in attr.expression:
        # Local expression is set on attribute level.
        # Template expression is set on attribute definition level.
        # By default all attributes inherit template expression from
        # their attribute definition. If calculation expression is
        # changed explicitly for a given attribute, then it is called
        # a local expression.
        if attr.is_local_expression:
            local_expressions[attr.path] = attr.expression
        else:
            template_expressions[attr.definition.path] = attr.expression


def search_calculation_expressions(session: Connection.Session, target):
    """
    Traverses the Mesh model recursively and finds all time series
    attributes that have calculation expressions containing specific string.
    """
    object = session.get_object(target, full_attribute_info=True)

    for attr in object.attributes.values():
        if isinstance(attr, OwnershipRelationAttribute):
            for child_id in attr.target_object_ids:
                search_calculation_expressions(session, child_id)
        elif isinstance(attr, TimeseriesAttribute):
            if keyword in attr.expression:
                add_expression_to_results(attr)


def search_calculation_expressions_faster(session: Connection.Session, target):
    """
    Get all objects from the Mesh model and search for time series
    attributes that have calculation expressions containing specific string.

    This method (with search_for_objects) is more efficient than
    search_calculation_expressions, that does top-down traversal of the Mesh model.
    `search_for_objects` retrieves all objects in a single streaming request.
    Downside is that is does not preserve the order of objects in the Mesh model.
    """
    for obj in session.search_for_objects(target, "{*}", full_attribute_info=True):
        for attr in obj.attributes.values():
            if isinstance(attr, TimeseriesAttribute):
                add_expression_to_results(attr)


def main(address, tls_root_pem_cert):

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        models = session.list_models()
        for model in models:
            print(f"\nModel: '{model.name}'")
            # search_calculation_expressions(session, model.id)
            search_calculation_expressions_faster(session, model.id)

            for path, expression in template_expressions.items():
                print(
                    f"Attribute definition path: {path} has template expression:\n{expression}"
                )

            for path, expression in local_expressions.items():
                print(f"Attribute path: {path} has local expression:\n{expression}")

            # clear search results before traversing the next model
            local_expressions.clear()
            template_expressions.clear()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)