Examples

This section contains code examples. Most of them require specific Mesh model on the server side. Best practice is to copy the examples scripts that are of interest for you together with helpers.py

Best practices

  1. Checkout or open in GitHub git tag corresponding to the Mesh Python SDK version you are using. E.g. for Mesh Python SDK v1.7 it is: https://github.com/Volue-Public/energy-mesh-python/tree/v1.7.0

  2. Copy either the whole examples directory or specific examples script(s) together with helpers.py and paste it to your own workspace.

  3. Run example:

    python .\examples\get_version.py localhost:50051 c:\certificate.pem
    

    First argument - Mesh server address with port. Default value is localhost:50051.

    Second argument - path to PEM-encoded TLS certificate used by Mesh server. Skip it if the Mesh server is configured to accept insecure gRPC connections. If provided, then make sure that instead of volue.mesh.Connection.Session.insecure(), the volue.mesh.Connection.Session.with_tls() is used to establish connection to Mesh.

Note

Starting from Mesh Python SDK 1.9, in all examples the connection to Mesh server is established using volue.mesh.Connection.Session.insecure(). To use a different connection type, e.g.: with TLS, the user has to change the example script. The PEM-encoded TLS certificate passed as a second argument will be discarded if an insecure connection is used.

Quickstart

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """Showing the quickest way to get started."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Which version is the server running
    version_info = connection.get_version()
    print(f"Connected to {version_info.name} {version_info.version}")

    # Create a remote session on the Volue Mesh server
    session = connection.create_session()
    session.open()
    print("You have now an open session and can request time series")

    # Close the remote session
    session.close()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Authorization

import asyncio

import helpers

import volue.mesh.aio
from volue import mesh


def sync_auth(address, tls_root_pem_cert, service_principal, user_principal):
    print("Synchronous authentication example: ")

    connection = mesh.Connection.with_kerberos(
        address, tls_root_pem_cert, service_principal, user_principal
    )

    user_identity = connection.get_user_identity()
    print(user_identity)

    # revoke no longer used token
    connection.revoke_access_token()


async def async_auth(address, tls_root_pem_cert, service_principal, user_principal):
    print("Asynchronous authentication example:")

    connection = mesh.aio.Connection.with_kerberos(
        address, tls_root_pem_cert, service_principal, user_principal
    )

    user_identity = await connection.get_user_identity()
    print(user_identity)

    # revoke no longer used token
    await connection.revoke_access_token()


def main(address, tls_root_pem_cert):
    """Showing how to authorize to gRPC Mesh server."""

    # If Mesh gRPC server is running as a service user,
    # for example LocalSystem, NetworkService or a user account
    # with a registered service principal name then it is enough
    # to provide hostname as service principal, e.g.:
    #   'HOST/hostname.companyad.company.com'
    # If Mesh gRPC server is running as a user account without
    # registered service principal name then it is enough to provide
    # user account name running Mesh server as service principal, e.g.:
    #   'ad\\user.name' or r'ad\user.name'
    # Note: winkerberos converts service principal name if provided in
    #       RFC-2078 format. '@' is converted to '/' if there is no '/'
    #       character in the service principal name. E.g.:
    #           service@hostname
    #       Would be converted to:
    #           service/hostname
    service_principal = "HOST/hostname.companyad.company.com"
    user_principal = None

    sync_auth(address, tls_root_pem_cert, service_principal, user_principal)
    asyncio.run(
        async_auth(address, tls_root_pem_cert, service_principal, user_principal)
    )


if __name__ == "__main__":
    # This will authenticate Python client (receive authorization token from Mesh),
    # then send gRPC request that requires authorization (e.g.: GetUserIdentity)
    # and print the result. If your user name info is printed, you have successfully
    # communicated with the server.
    #
    # This requires Mesh server to be running with enabled TLS and Kerberos options.

    address, tls_root_pem_cert = helpers.get_connection_info()
#    main(address, tls_root_pem_cert)

Connect

import helpers

from volue.mesh import Connection


def get_version(connection):
    """Showing how to send get the server version."""
    print("1. Requesting server version")
    version = connection.get_version()
    print(f"2. Server version is {version.version}")


def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    session.open()
    print("B. Ending session")
    session.close()


def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks sequentially."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    get_version(connection)
    start_and_end_session(connection.create_session())


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)
    print("Done")

# Outputs:
# 1. Requesting server version
# 2. Server version is 1.12.5.0-dev
# A. Starting session
# B. Ending session
# Done

Connect, asynchronously

import asyncio

import helpers

from volue.mesh.aio import Connection


async def get_version(connection):
    """Showing how to get the server version."""
    print("1. Requesting server version")
    version = await connection.get_version()
    print(f"2. Server version is {version.version}")


async def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    await session.open()
    print("B. Ending session")
    await session.close()


async def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks concurrently."""
    # Creating a connection, but not sending any requests yet.

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Indicate that these two functions can be run concurrently.
    await asyncio.gather(
        get_version(connection), start_and_end_session(connection.create_session())
    )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))
    print("Done")

# Outputs:
# 1. Requesting server version
# A. Starting session
# 2. Server version is 1.12.5.0-dev
# B. Ending session
# Done

Connect using external access token

from datetime import datetime

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """
    Showing how to authorize to gRPC Mesh server using externally obtained
    access token, e.g: a OAuth JWT. Obtaining the access token is out of scope
    for this example.

    Depending on your environment, e.g.: Azure AD, using libraries like
    Microsoft Authentication Library (MSAL) for getting the tokens is
    suggested.
    """

    token = "my_token"
    connection = Connection.with_external_access_token(
        address, tls_root_pem_cert, access_token=token
    )

    with connection.create_session() as session:
        # Print user information contained in the access token.
        user_identity = connection.get_user_identity()
        print(user_identity)

        # Read some time series data.
        # This requires the user has time series read permissions.
        timeseries_key = 1388
        timeseries = session.read_timeseries_points(
            timeseries_key, datetime(2014, 1, 1), datetime(2015, 1, 1)
        )
        print(timeseries.arrow_table.to_pandas())

        # For long running sessions it may be necessary to refresh the access
        # token.
        # Other possibility would be to catch grpc.RpcError with status code
        # UNAUTHENTICATED and then get new access token and update it in the
        # Mesh connection using `update_external_access_token`.
        connection.update_external_access_token("my_new_access_token")


if __name__ == "__main__":
    # This requires Mesh server to be running with enabled TLS and OAuth options.
    # Obtaining access token is out of the scope for this example.

    address, tls_root_pem_cert = helpers.get_connection_info()
    # main(address, tls_root_pem_cert)

Get version

import asyncio

import helpers

import volue.mesh.aio
from volue import mesh


def sync_get_version(address, tls_root_pem_cert):
    print("Synchronous get version:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    version_info = connection.get_version()
    print(version_info.version)


async def async_get_version(address, tls_root_pem_cert):
    print("Asynchronous get version:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.aio.Connection.insecure(address)

    version_info = await connection.get_version()
    print(version_info.version)


if __name__ == "__main__":
    # This will request and print version info from the mesh server.
    # If some sensible version info is printed, you have successfully
    # communicated with the server.
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_get_version(address, tls_root_pem_cert)
    asyncio.run(async_get_version(address, tls_root_pem_cert))

Read time series

import uuid
from datetime import datetime

import helpers

from volue.mesh import Connection


def read_timeseries_points(session: Connection.Session):
    """Showing how to read time series points."""

    # Define the time series identifier, it can be:
    # - time series key of a physical time series
    # - path of a time series attribute that is connected to a physical time series
    # - ID of a time series attribute that is connected to a physical time series
    timeseries_key = 3
    timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
    timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")

    # Defining a time interval to read time series from.
    # If no time zone is provided then it will be treated as UTC.
    start = datetime(2016, 1, 1, 6, 0, 0)
    end = datetime(2016, 1, 1, 8, 0, 0)

    # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
    # In case of larger data volumes please send request data in chunks.
    # E.g.: call multiple times `read_timeseries_points` with shorter interval.

    # Send request to read time series based on time series key.
    timeseries = session.read_timeseries_points(
        target=timeseries_key, start_time=start, end_time=end
    )
    print(f"Read {timeseries.number_of_points} points using time series key.")
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute path.
    timeseries = session.read_timeseries_points(
        target=timeseries_attribute_path, start_time=start, end_time=end
    )
    print(
        f"Read {timeseries.number_of_points} points using time series attribute path."
    )
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute ID.
    # Attribute IDs are auto-generated when an object is created.
    # That is why we can't use any fixed ID in this example and the code is commented out.
    # timeseries = session.read_timeseries_points(
    #     target=timeseries_attribute_id, start_time=start, end_time=end
    # )
    # print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
    # print(timeseries.arrow_table.to_pandas())


def main(address, tls_root_pem_cert):
    """Showing how to get time series points."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        read_timeseries_points(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Read time series, asynchronously

import asyncio
import uuid
from datetime import datetime

import helpers

from volue.mesh.aio import Connection


async def read_timeseries_points_async(session: Connection.Session):
    """Showing how to read time series points."""

    # Define the time series identifier, it can be:
    # - time series key of a physical time series
    # - path of a time series attribute that is connected to a physical time series
    # - ID of a time series attribute that is connected to a physical time series
    timeseries_key = 3
    timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
    timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")

    # Defining a time interval to read time series from.
    # If no time zone is provided then it will be treated as UTC.
    start = datetime(2016, 1, 1, 6, 0, 0)
    end = datetime(2016, 1, 1, 8, 0, 0)

    # Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
    # In case of larger data volumes please send request data in chunks.
    # E.g.: call multiple times `read_timeseries_points` with shorter interval.

    # Send request to read time series based on time series key.
    timeseries = await session.read_timeseries_points(
        target=timeseries_key, start_time=start, end_time=end
    )
    print(f"Read {timeseries.number_of_points} points using time series key.")
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute path.
    timeseries = await session.read_timeseries_points(
        target=timeseries_attribute_path, start_time=start, end_time=end
    )
    print(
        f"Read {timeseries.number_of_points} points using time series attribute path."
    )
    print(timeseries.arrow_table.to_pandas())

    # Send requests to read time series based on time series attribute ID.
    # Attribute IDs are auto-generated when an object is created.
    # That is why we can't use any fixed ID in this example and the code is commented out.
    # timeseries = await session.read_timeseries_points(
    #     target=timeseries_attribute_id, start_time=start, end_time=end
    # )
    # print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
    # print(timeseries.arrow_table.to_pandas())


async def main(address, tls_root_pem_cert):
    """Showing how to get time series points asynchronously."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    async with connection.create_session() as session:
        await read_timeseries_points_async(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))

Read and process time series, asynchronously

import asyncio
from datetime import datetime

import grpc
import helpers

from volue.mesh.aio import Connection


async def process_timeseries_values(arrow_table):
    # the processing can also be an async IO call
    # to e.g. separate service
    await asyncio.sleep(8)
    print(f"Processing completed - {len(arrow_table)} points were processed")


async def read_timeseries_points(session, path):
    start_time = datetime(2016, 5, 1)
    end_time = datetime(2016, 5, 4)

    # read operation can be a long running operation
    # with asyncio API we can switch to do something else
    # while waiting for the read operation to complete
    # (e.g. doing processing for already returned time series)
    timeseries_read = await session.read_timeseries_points(
        target=path, start_time=start_time, end_time=end_time
    )

    return timeseries_read.arrow_table


async def handle_timeseries(session, path):
    arrow_table = await read_timeseries_points(session, path)
    await process_timeseries_values(arrow_table)


async def main(address, tls_root_pem_cert):
    """
    Showing how to use asynchronous connection in a real-world scenario.
    First multiple time series are returned that match a given query.
    Then each time series is read and some processing is applied.

    Note: Mesh does not yet handle parallel requests, every request is handled
          sequentially.

    This example works with `SimpleThermalTestModel` test model, where 2 time
    series are returned for the given query. Assume reading time series takes
    10 seconds and processing (or some different computation, e.g.
    neural network inference based on that input) takes 8 seconds.
    When waiting for the read operation to complete for the second time series
    we can already start processing the first time series. See:

    Tr = 10s (reading)
    Tp = 8s  (processing)

    Asynchronous code (with Mesh handling requests sequentially):
    |   Tr1  ||  Tp1 |
    ------------------
              |   Tr2  ||  Tp2 |
              ------------------
    ---------- 28s -------------

    Whereas for synchronous code:
    |   Tr1  ||  Tp1 |
    ------------------
                      |   Tr2  ||  Tp2 |
                      ------------------
    ---------------  36s ---------------

    For 2 time series we save ~22% time.
    For 10 time series it would be 40% time (108s instead of 180s).

    Note: the processing could be also done using async IO
          (e.g. requests send to different service).
    """

    query = "*.TsRawAtt"
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    async with connection.create_session() as session:
        try:
            timeseries_attributes = await session.search_for_timeseries_attributes(
                target=start_object_path, query=query
            )
        except grpc.RpcError as e:
            print(f"Could not find timeseries attribute: {e}")
            return

        print(f"Number of found time series: {len(timeseries_attributes)}")
        await asyncio.gather(
            *(
                handle_timeseries(session, timeseries_attribute.path)
                for timeseries_attribute in timeseries_attributes
            )
        )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))

Search for time series attributes

import helpers

from volue.mesh import Connection, OwnershipRelationAttribute, TimeseriesAttribute


def search_method_1(session: Connection.Session):
    """
    This method uses `search_for_timeseries_attributes` function.
    Wildcard search expression for attributes is not supported, meaning we
    can't get all time series attributes using simply "*" search expression.

    Useful e.g.: when searching for attributes with known names.
    """
    print("Search method 1")

    # Specify what you want to search for

    # The Mesh object to start searching from
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
    # OR
    # start_object_guid = uuid.UUID("0000000b-0001-0000-0000-000000000000")  # ThermalComponent

    # The query expressed using Mesh search language syntax
    # Traverse all children (*) of the start object, not case-sensitive ({}) and
    # accept all that has an attribute (.) called TsRawAtt
    query = "{*}.TsRawAtt"

    # Search for time series attributes using this query
    timeseries_attributes = session.search_for_timeseries_attributes(
        target=start_object_path, query=query
    )

    print(f"Number of found time series: {len(timeseries_attributes)}")
    for attribute in timeseries_attributes:
        if isinstance(attribute, TimeseriesAttribute):
            print(attribute.path)


def search_method_2(session: Connection.Session):
    """
    This method uses `search_for_objects` function with wildcard search
    expression ("*") that returns all objects.
    Useful when you want to traverse the complete model.

    Additionally here we show how to distinguish if time series attribute has
    a time series calculation expression defined or a physical time series
    connected to it.
    """
    print("Search method 2")

    # Provide root object/model as the start object to run the search.
    # Root objects/models do not contain any attributes.
    start_object_path = "Model/SimpleThermalTestModel"

    # Returns every object.
    query = "*"

    # Depending on the model size this may take long time to complete.
    objects = session.search_for_objects(target=start_object_path, query=query)

    print(f"Number of found objects: {len(objects)}")
    for object in objects:
        print(object.name)
        for attribute in object.attributes.values():
            if isinstance(attribute, TimeseriesAttribute):
                # If time series resource is set, then it means a physical
                # time series is connected to the attribute.
                if attribute.time_series_resource is not None:
                    print(
                        f"{attribute.path} has physical time series connected with time series key {attribute.time_series_resource.timeseries_key}"
                    )


def search_method_3(session: Connection.Session):
    """
    This method uses `get_object` function.
    Useful when you want to traverse a subset of the model from a given node,
    but not root object/model. Models do not contain attributes, so you won't
    be able to traverse the child objects using ownership attributes.
    """
    print("Search method 3")

    def traverse_child_objects(session: Connection.Session, target):
        object = session.get_object(target, full_attribute_info=True)

        for attribute in object.attributes.values():
            if isinstance(attribute, OwnershipRelationAttribute):
                for child_id in attribute.target_object_ids:
                    traverse_child_objects(session, child_id)

            if isinstance(attribute, TimeseriesAttribute):
                print(attribute.path)

    object_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1"
    traverse_child_objects(session, object_path)


def main(address, tls_root_pem_cert):
    """Showing how to search for Mesh time series attributes in various ways."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Create a remote session on the Volue Mesh server.
    with connection.create_session() as session:
        search_method_1(session)
        search_method_2(session)
        search_method_3(session)


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Traverse model

import helpers

from volue.mesh import Connection, OwnershipRelationAttribute

leaves = []


def traverse_model_top_down(session: Connection.Session, target, depth=0):
    """Traverses the Mesh model recursively."""
    object = session.get_object(target)
    print(f"{'..' * depth}{object.name}")
    leaf = True

    for attr in object.attributes.values():
        if isinstance(attr, OwnershipRelationAttribute):
            for child_id in attr.target_object_ids:
                leaf = False
                traverse_model_top_down(session, child_id, depth + 1)
    if leaf:
        leaves.append(object)


def traverse_model_bottom_up(session: Connection.Session, target, model):
    object = session.get_object(target)
    depth = object.path.count("/") - 1
    print(f"{'..' * depth}{object.name}")
    if object.owner_id == model.id:
        print(model.name)
        return
    attribute = session.get_attribute(object.owner_id)
    traverse_model_bottom_up(session, attribute.owner_id, model)


def main(address, tls_root_pem_cert):
    """Showing how to traverse Mesh model."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        models = session.list_models()
        for model in models:
            leaves.clear()
            print(f"\nModel: '{model.name}'")
            print("Top-bottom traversal:")
            traverse_model_top_down(session, model.id)
            # Excepted output:
            # Model
            # ..ChildObject1
            # ....SubChildObject1
            # ....SubChildObject2
            # ..ChildObject2
            print("\nBottom-top traversal:")
            traverse_model_bottom_up(session, leaves[0].id, model)
            # Excepted output:
            # ....SubChildObject1
            # ..ChildObject1
            # Model


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Using time series with pandas

from datetime import datetime

import grpc
import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz

from volue.mesh import Connection, Timeseries
from volue.mesh.calc import transform as Transform
from volue.mesh.calc.common import Timezone


def main(address, tls_root_pem_cert):
    """Showing how to find time series, write, read points from it and convert them to pandas format."""

    query = "*[.Name=SomePowerPlantChimney2].TsRawAtt"  # make sure only 1 time series is returned
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # first lets find a time series in our model
        try:
            timeseries_attributes = session.search_for_timeseries_attributes(
                start_object_path, query
            )
        except grpc.RpcError as e:
            print(f"Could not find time series attribute: {e}")
            return

        if len(timeseries_attributes) == 0:
            print("No such time series attribute in the given model/database")
            return

        print(f"Number of found time series: {len(timeseries_attributes)}")

        # pick the first time series and do some operations with it
        timeseries_attribute = timeseries_attributes[0]
        print("Working on timeseries with path: " + timeseries_attribute.path)

        # check for example the curve type of the connected physical time series
        print(f"Curve: {timeseries_attribute.time_series_resource.curve_type}")

        # now lets write some data to it
        try:
            # Mesh data is organized as an Arrow table with the following schema:
            # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
            # flags - [pa.uint32]
            # value - [pa.float64]

            number_of_points = 72
            timestamps = []
            values = []
            for i in range(0, number_of_points):
                hours = i % 24
                days = int(i / 24) + 1
                timestamps.append(
                    datetime(2016, 5, days, hours)
                )  # if no time zone is provided then the timestamp is treated as UTC
                values.append(days * 10)

            flags = [Timeseries.PointFlags.OK.value] * number_of_points

            arrays = [pa.array(timestamps), pa.array(flags), pa.array(values)]
            arrow_table = pa.Table.from_arrays(arrays, schema=Timeseries.schema)

            timeseries = Timeseries(
                table=arrow_table, full_name=timeseries_attribute.path
            )
            session.write_timeseries_points(timeseries)

        except grpc.RpcError as e:
            print(f"Could not write timeseries points: {e}")

        local_time_zone = tz.tzlocal()

        # now lets read from it
        try:
            # lets use local time zone (read from operating system settings)
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            timeseries_read = session.read_timeseries_points(
                target=timeseries_attribute, start_time=start_time, end_time=end_time
            )

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = timeseries_read.arrow_table.to_pandas()

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # notice that depending on the local time zone there is a shift in the data
            # e.g. for UTC+2 time zone, first 2 values will be NaN, because writing time series points in the previous step
            # is using time zone naive datetime object, so they are treated as UTC.

            # do some further processing

        except grpc.RpcError as e:
            print(f"Could not read timeseries points: {e}")

        # now lets read transformations from it (transform to days)
        print("Transform resolution to days:")
        try:
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            # Transform function may take optionally a time zone argument.
            # Refer to `transform` documentation for more details.
            # If you are using `LOCAL` or `STANDARD` time zone then make sure
            # the Mesh server is operating in the same time zone or adjust properly.
            transformed_timeseries = session.transform_functions(
                timeseries_attribute, start_time, end_time
            ).transform(Timeseries.Resolution.DAY, Transform.Method.SUM, Timezone.LOCAL)

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = transformed_timeseries.arrow_table.to_pandas()
            print(pandas_series)

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # do some further processing

        except Exception as e:
            print(f"Could not read transformed timeseries points: {e}")

        # optionally discard changes
        session.rollback()


if __name__ == "__main__":
    # This will search for a given time series, write some data,
    # read it and convert to pandas format.

    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Working with model (objects and attributes)

import helpers

from volue.mesh import AttributesFilter, Connection, OwnershipRelationAttribute


def main(address, tls_root_pem_cert):
    root_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # Root object has an ownership relation attribute that point to objects
        # of "PlantElementType" type. We want to add new object of this type.
        root_object = session.get_object(root_object_path, full_attribute_info=True)

        # First get the ownership attribute:
        # If we know the name of the attribute we can use:
        ownership_relation_attribute = root_object.attributes["ThermalPowerToPlantRef"]

        # If we don't know the name, but only the type of the target object
        # (the object it points to):
        # Note: This requires `full_attribute_info` flag set to True when
        #       calling `get_object` or `get_attribute`.
        for attribute in root_object.attributes.values():
            if (
                isinstance(attribute, OwnershipRelationAttribute)
                and attribute.definition.target_object_type_name == "PlantElementType"
            ):
                ownership_relation_attribute = attribute

        new_object = session.create_object(
            ownership_relation_attribute, "SomeNewPowerPlant"
        )
        print(f"New object created: {new_object.path}")

        int_attribute = new_object.attributes["Int64Att"]

        # Object returned by `create_object` contains all attributes with basic
        # information.
        print("One of the new object's attributes (basic information):")
        print(int_attribute)

        # Now let's update attribute's value.
        session.update_simple_attribute(int_attribute, 100)

        # Check updated value, but this time get also more information like
        # attribute definition. Use either:
        # - `get_object` with `full_attribute_info` flag
        # - `get_attribute` with `full_attribute_info` flag
        attribute_with_full_info = session.get_attribute(
            int_attribute, full_attribute_info=True
        )
        print("One of the new object's attributes (full information):")
        print(attribute_with_full_info)

        # We can also filter attributes by e.g. name.
        object_with_filtered_attributes = session.get_object(
            new_object,
            attributes_filter=AttributesFilter(name_mask=["DblAtt", "StringAtt"]),
        )
        print(
            f"Filtered attributes count: {len(object_with_filtered_attributes.attributes)}"
        )

        # Now lets change the object name.
        session.update_object(new_object, new_name="NewNamePowerPlant")
        print("Object's name changed.")

        # Delete the updated object.
        # Because we changed object's name, we can't provide old `Object`
        # instance as it is still having the old object name.
        # Use object's ID instead or provide the path to the object explicitly.
        session.delete_object(new_object.id)
        print("Object deleted.")

        # To commit your changes you need to call:
        # session.commit()


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Working with rating curves

from datetime import datetime

import helpers

from volue.mesh import Connection, RatingCurveSegment, RatingCurveVersion


def main(address, tls_root_pem_cert):
    rating_curve_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1.RatingCurveAtt"

    # Defining a time interval to read rating curve versions from.
    # If no time zone is provided then it will be treated as UTC.
    start_time = datetime(2008, 1, 1)
    end_time = datetime(2022, 1, 1)

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # First read the attribute using `get_attribute`.
        # We can get standard information like name, ID, tags, etc.
        rating_curve_attribute = session.get_attribute(
            rating_curve_attribute_path, full_attribute_info=True
        )
        print(
            f"Basic information about the rating curve attribute: {rating_curve_attribute}\n"
        )

        # Because the rating curve can potentially contain large amounts of data,
        # specialized methods exist to handle those values.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print(
            (
                "Rating curve versions for time interval: "
                f"{start_time.strftime('%d.%m.%Y')} - {end_time.strftime('%d.%m.%Y')}:"
            )
        )
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        if len(versions) == 0 or len(versions[-1].x_value_segments) == 0:
            print("No rating curve versions found. Skip update.")
            return

        # Now for the last version update first segment and add a new one.
        versions[-1].x_value_segments[0].factor_b = -3.2
        versions[-1].x_value_segments.append(RatingCurveSegment(25, -1.1, 2.2, -3.3))
        session.update_rating_curve_versions(
            target=rating_curve_attribute_path,
            start_time=versions[
                0
            ].valid_from_time,  # use first version `valid_from_time` as start interval
            end_time=datetime.max,  # this is the last version
            new_versions=versions,
        )

        # Read once again.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print("Updated rating curve versions:")
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        # Now lets replace last version with new one
        new_segments = [
            RatingCurveSegment(10, 1.1, 1.9, -3.5),
            RatingCurveSegment(21, 2.1, 1.8, -3.2),
        ]

        new_version = RatingCurveVersion(
            valid_from_time=datetime(2010, 6, 1),
            x_range_from=5.0,
            x_value_segments=new_segments,
        )

        session.update_rating_curve_versions(
            target=rating_curve_attribute_path,
            start_time=versions[
                -1
            ].valid_from_time,  # to replace old version use its `valid_from_time` as start interval
            end_time=datetime.max,  # this is the last version
            new_versions=[new_version],
        )

        # Read once again.
        versions = session.get_rating_curve_versions(
            target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
        )

        print("Updated for the second time rating curve versions:")
        for i, version in enumerate(versions):
            print(f"Version {i+1}:\n{version}")

        # to commit your changes you need to call:
        # session.commit()


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Working with sessions

import uuid

import helpers

from volue.mesh import Connection


def main(address, tls_root_pem_cert):
    """Showing different ways of working with sessions."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # 1. Create a session, open and close session.
    session = connection.create_session()
    session.open()
    print("1. You now have a new open session")
    session.close()

    # 2. Create session using the with statement.
    # Session will be created, opened and closed within the 'with' statement scope
    with connection.create_session() as session:
        print("2. You now have a new open session")

    # 3. Connecting to a potentially open session.
    # Session ids can be found in the session object:
    # session.session_id
    # Note: If the session_id is not the id of an open session,
    # the server will create a new one for you.
    # Set the session id you want to connect to
    session_id = uuid.UUID("123e4567-e89b-12d3-a456-556642440000")
    print(f"3. Session id you want to connect to: {session_id}")
    session = connection.connect_to_session(session_id)
    # Try connecting to that session id, if it does not exist, a new one will be created without warning
    session.open()
    print(
        "3. You have now an open session, either the one you requested or a new one if it did not exist"
    )
    # Check which session you are connected to and close it
    print(f"3. Session id you are to connect to: {session.session_id}")
    session.close()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)

Working with XY sets

import datetime

import helpers

from volue import mesh

OBJECT_PATH = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1"
UNVERSIONED_PATH = OBJECT_PATH + ".XYSetAtt"
VERSIONED_PATH = OBJECT_PATH + ".XYZSeriesAtt"


def main(address, tls_root_pem_cert):
    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    with connection.create_session() as session:
        # In the default test model both the versioned and the unversioned
        # attribute are initially empty. The following two calls are therefore
        # expected to return [].

        print(f"Getting XY-sets from {UNVERSIONED_PATH}")
        xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
        print(xy_sets)

        start_time = datetime.datetime.fromisoformat("1960-01-01")
        end_time = datetime.datetime.fromisoformat("2030-01-01")
        print(
            f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
        )
        xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
        print(f"Received: {xy_sets}")

        sample_xy_set = mesh.XySet(
            None, [mesh.XyCurve(0.0, [(1.0, 10.3), (2.5, 15.9)])]
        )

        print(f"Updating XY-set at {UNVERSIONED_PATH}")
        session.update_xy_sets(UNVERSIONED_PATH, new_xy_sets=[sample_xy_set])
        print("Updated")

        print(
            f"Replacing XY-set versions in interval [{start_time}, {end_time}) with one version at {start_time}"
        )
        sample_xy_set.valid_from_time = start_time
        session.update_xy_sets(VERSIONED_PATH, start_time, end_time, [sample_xy_set])
        print("Updated")

        print(f"Getting XY-sets from {UNVERSIONED_PATH}")
        xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
        print(f"Received: {xy_sets}")

        print(
            f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
        )
        xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
        print(f"Received: {xy_sets}")


if __name__ == "__main__":
    args = helpers.get_connection_info()
    main(*args)

Write time series

import asyncio
import uuid
from datetime import datetime

import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz

import volue.mesh.aio
from volue import mesh

# Define the time series identifier, it can be:
# - time series key of a physical time series
# - path of a time series attribute that is connected to a physical time series
# - ID of a time series attribute that is connected to a physical time series
timeseries_key = 3
timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")


def get_pa_table_with_time_series_points() -> pa.Table:
    # Defining the data we want to write.
    # Mesh data is organized as an Arrow table with the following schema:
    # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
    # flags - [pa.uint32]
    # value - [pa.float64]

    # time_zone = tz.gettz("Europe/Warsaw")
    time_zone = tz.UTC

    arrays = [
        # if no time zone is provided then the timestamp is treated as UTC
        pa.array(
            pd.date_range(
                datetime(2016, 1, 1, 1, tzinfo=time_zone), periods=3, freq="1h"
            )
        ),
        pa.array([mesh.Timeseries.PointFlags.OK.value] * 3),
        pa.array([0.0, 10.0, 1000.0]),
    ]
    return pa.Table.from_arrays(arrays, schema=mesh.Timeseries.schema)


def sync_write_timeseries_points(address, tls_root_pem_cert):
    print("Synchronous write time series points:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.Connection.insecure(address)

    with connection.create_session() as session:
        table = get_pa_table_with_time_series_points()

        # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
        # In case of larger data volumes please send input data in chunks.
        # E.g.: call multiple times `write_timeseries_points` with shorter interval.

        # Send request to write time series based on time series key.
        timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
        session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series key.")

        # To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
        empty_table = mesh.Timeseries.schema.empty_table()

        # If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
        # Because we are providing empty table we must specify them explicitly.
        # For this interval all existing points will be removed.
        #
        # End time is exclusive so from the 3 points written by timeseries key,
        # 2 points will be removed by this call.
        #
        # Send request to write time series based on time series attribute path/full_name.
        timeseries = mesh.Timeseries(
            table=empty_table,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
            full_name=timeseries_attribute_path,
        )
        session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series attribute path.")

        # Let's check it. We should get just one point.
        # Note:
        # - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
        # - the `end_time` in `read_timeseries_points` is also exclusive,
        #   but for time series with linear curve type Mesh is also returning one point after the interval.
        timeseries = session.read_timeseries_points(
            target=timeseries_key,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
        )
        print(f"Read {timeseries.number_of_points} points using time series key.")
        print(timeseries.arrow_table.to_pandas())

        # Send request to write time series based on time series attribute ID.
        # Attribute IDs are auto-generated when an object is created.
        # That is why we can't use any fixed ID in this example and the code will throw.
        try:
            timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
            session.write_timeseries_points(timeseries=timeseries)
            print("Time series points written using time series attribute ID.")
        except Exception as e:
            print(
                f"failed to write time series points based on time series attribute ID: {e}"
            )
        # Commit the changes to the database.
        # session.commit()

        # Or discard changes.
        session.rollback()


async def async_write_timeseries_points(
    address,
    tls_root_pem_cert,
):
    print("Asynchronous write time series points:")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
    connection = mesh.aio.Connection.insecure(address)

    async with connection.create_session() as session:
        table = get_pa_table_with_time_series_points()

        # Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
        # In case of larger data volumes please send input data in chunks.
        # E.g.: call multiple times `write_timeseries_points` with shorter interval.

        # Send request to write time series based on time series key.
        timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
        await session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series key.")

        # To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
        empty_table = mesh.Timeseries.schema.empty_table()

        # If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
        # Because we are providing empty table we must specify them explicitly.
        # For this interval all existing points will be removed.
        #
        # End time is exclusive so from the 3 points written by timeseries key,
        # 2 points will be removed by this call.
        #
        # Send request to write time series based on time series attribute path/full_name.
        timeseries = mesh.Timeseries(
            table=empty_table,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
            full_name=timeseries_attribute_path,
        )
        await session.write_timeseries_points(timeseries=timeseries)
        print("Time series points written using time series attribute path.")

        # Let's check it. We should get just one point.
        # Note:
        # - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
        # - the `end_time` in `read_timeseries_points` is also exclusive,
        #   but for time series with linear curve type Mesh is also returning one point after the interval.
        timeseries = await session.read_timeseries_points(
            target=timeseries_key,
            start_time=datetime(2016, 1, 1, 1),
            end_time=datetime(2016, 1, 1, 3),
        )
        print(f"Read {timeseries.number_of_points} points using time series key.")
        print(timeseries.arrow_table.to_pandas())

        # Send request to write time series based on time series attribute ID.
        # Attribute IDs are auto-generated when an object is created.
        # That is why we can't use any fixed ID in this example and the code will throw.
        try:
            timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
            await session.write_timeseries_points(timeseries=timeseries)
            print("Time series points written using time series attribute ID.")
        except Exception as e:
            print(
                f"failed to write time series points based on time series attribute ID: {e}"
            )

        # Commit the changes to the database.
        # await session.commit()

        # Or discard changes.
        await session.rollback()


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_write_timeseries_points(
        address,
        tls_root_pem_cert,
    )
    asyncio.run(async_write_timeseries_points(address, tls_root_pem_cert))

Run simulations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


async def async_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            async for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_simulation(address, tls_root_pem_cert)
    asyncio.run(async_run_simulation(address, tls_root_pem_cert))

Run inflow calculations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


async def async_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            async for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_inflow_calculation(address, tls_root_pem_cert)
    asyncio.run(async_run_inflow_calculation(address, tls_root_pem_cert))