Mesh Python SDK

The Mesh Python SDK can create a client which is able to communicate with a Mesh server using gRPC. Remote procedure calls is a way of sending requests and responses over a network. The request is serialized, using protocol buffers (aka proto) and packaged then sent to the server which processes the request and sends back a response.

Depending on the request it may take a long time to process by the server. In such cases the client may use package volue.mesh.aio which is implemented using the asyncio library that enables concurrency and lets Python perform other tasks while waiting for the response from the server.

This concept of concurrency can be demonstrated using the following examples. Notice the output of the different examples: 1, 2, A, B vs 1, A, 2, B.

Using volue.mesh.Connection:

import helpers

from volue.mesh import Connection


def get_version(connection):
    """Showing how to send get the server version."""
    print("1. Requesting server version")
    version = connection.get_version()
    print(f"2. Server version is {version.version}")


def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    session.open()
    print("B. Ending session")
    session.close()


def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks sequentially."""

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    get_version(connection)
    start_and_end_session(connection.create_session())


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)
    print("Done")

# Outputs:
# 1. Requesting server version
# 2. Server version is 1.12.5.0-dev
# A. Starting session
# B. Ending session
# Done

Using volue.mesh.aio.Connection:

import asyncio

import helpers

from volue.mesh.aio import Connection


async def get_version(connection):
    """Showing how to get the server version."""
    print("1. Requesting server version")
    version = await connection.get_version()
    print(f"2. Server version is {version.version}")


async def start_and_end_session(session):
    """Showing how to start and end a session."""
    print("A. Starting session")
    await session.open()
    print("B. Ending session")
    await session.close()


async def main(address, tls_root_pem_cert):
    """Showing how to connect to a server and run two tasks concurrently."""
    # Creating a connection, but not sending any requests yet.

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    # Indicate that these two functions can be run concurrently.
    await asyncio.gather(
        get_version(connection), start_and_end_session(connection.create_session())
    )


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    asyncio.run(main(address, tls_root_pem_cert))
    print("Done")

# Outputs:
# 1. Requesting server version
# A. Starting session
# 2. Server version is 1.12.5.0-dev
# B. Ending session
# Done

As time series data can potentially be large Apache Arrow is used to optimize memory sharing.

gRPC communication

By default gRPC limits the size of inbound messages to 4MB. From Mesh Python SDK side, the user can change this limit when creating a connection to Mesh using grpc_max_receive_message_length argument.

See:

  • volue.mesh.Connection.Session.insecure()

  • volue.mesh.Connection.Session.with_tls()

  • volue.mesh.Connection.Session.with_kerberos()

  • volue.mesh.Connection.Session.with_external_access_token()

Example usage:

connection = mesh.Connection.with_tls(
    address,
    tls_root_pem_cert,
    grpc_max_receive_message_length=10 * 1024* 1024,  # 10MB
)

Another example of connection with grpc_max_receive_message_length argument is in run_simulation.py.

Note

gRPC outbound message size is not limited by default.

This might be useful when e.g.: running long simulations with return_datasets enabled. In such cases the dataset size might exceed the 4MB limit and a RESOURCE_EXHAUSTED status code would be returned.

However, in other cases like reading time series data, we suggest reading the data in chunks. E.g.: instead of reading 50 years of hourly time series data in a single request, the user should request several read operations with shorter read intervals.

The same is true for writing data, like time series data. Here however, it is not a suggestion, but a must. Mesh server gRPC inbound message size is not configurable and therefore it is always equal to 4MB. If gRPC client, like Mesh Python SDK, sends a message which is too large, then the request will be discarded. To avoid this, clients must send data in chunks.

Note

Single time series point occupies 20 bytes. To avoid exceeding the 4MB limit single read or write operation should contain ~200k points maximum.

Date times and time zones

The Mesh Python SDK can accept either time zone naive (no time zone information is provided) or time zone aware date time objects. All time zone naive date time objects are treated as UTC. Time zone aware date time objects are converted to UTC by the Mesh Python SDK before sending to Mesh server.

Note

Time series data returned as PyArrow table is always using UTC to represent timestamps. The user has to convert the timestamps to different format if needed.

Warning

As of PyArrow 7.0.0 the time zone information provided by dateutil.gettz is not supported. Please use datetime.timezone instead. E.g.:

some_tzinfo = timezone(timedelta(hours=-3))

Please refer to the example timeseries_operations.py to learn how to work with time zones. Presented below:

from datetime import datetime

import grpc
import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz

from volue.mesh import Connection, Timeseries
from volue.mesh.calc import transform as Transform
from volue.mesh.calc.common import Timezone


def main(address, tls_root_pem_cert):
    """Showing how to find time series, write, read points from it and convert them to pandas format."""

    query = "*[.Name=SomePowerPlantChimney2].TsRawAtt"  # make sure only 1 time series is returned
    start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = Connection.with_tls(address, tls_root_pem_cert)
    connection = Connection.insecure(address)

    with connection.create_session() as session:
        # first lets find a time series in our model
        try:
            timeseries_attributes = session.search_for_timeseries_attributes(
                start_object_path, query
            )
        except grpc.RpcError as e:
            print(f"Could not find time series attribute: {e}")
            return

        if len(timeseries_attributes) == 0:
            print("No such time series attribute in the given model/database")
            return

        print(f"Number of found time series: {len(timeseries_attributes)}")

        # pick the first time series and do some operations with it
        timeseries_attribute = timeseries_attributes[0]
        print("Working on timeseries with path: " + timeseries_attribute.path)

        # check for example the curve type of the connected physical time series
        print(f"Curve: {timeseries_attribute.time_series_resource.curve_type}")

        # now lets write some data to it
        try:
            # Mesh data is organized as an Arrow table with the following schema:
            # utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
            # flags - [pa.uint32]
            # value - [pa.float64]

            number_of_points = 72
            timestamps = []
            values = []
            for i in range(0, number_of_points):
                hours = i % 24
                days = int(i / 24) + 1
                timestamps.append(
                    datetime(2016, 5, days, hours)
                )  # if no time zone is provided then the timestamp is treated as UTC
                values.append(days * 10)

            flags = [Timeseries.PointFlags.OK.value] * number_of_points

            arrays = [pa.array(timestamps), pa.array(flags), pa.array(values)]
            arrow_table = pa.Table.from_arrays(arrays, schema=Timeseries.schema)

            timeseries = Timeseries(
                table=arrow_table, full_name=timeseries_attribute.path
            )
            session.write_timeseries_points(timeseries)

        except grpc.RpcError as e:
            print(f"Could not write timeseries points: {e}")

        local_time_zone = tz.tzlocal()

        # now lets read from it
        try:
            # lets use local time zone (read from operating system settings)
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            timeseries_read = session.read_timeseries_points(
                target=timeseries_attribute, start_time=start_time, end_time=end_time
            )

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = timeseries_read.arrow_table.to_pandas()

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # notice that depending on the local time zone there is a shift in the data
            # e.g. for UTC+2 time zone, first 2 values will be NaN, because writing time series points in the previous step
            # is using time zone naive datetime object, so they are treated as UTC.

            # do some further processing

        except grpc.RpcError as e:
            print(f"Could not read timeseries points: {e}")

        # now lets read transformations from it (transform to days)
        print("Transform resolution to days:")
        try:
            start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
            end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)

            # Transform function may take optionally a time zone argument.
            # Refer to `transform` documentation for more details.
            # If you are using `LOCAL` or `STANDARD` time zone then make sure
            # the Mesh server is operating in the same time zone or adjust properly.
            transformed_timeseries = session.transform_functions(
                timeseries_attribute, start_time, end_time
            ).transform(Timeseries.Resolution.DAY, Transform.Method.SUM, Timezone.LOCAL)

            # convert to pandas format
            # the timestamps in PyArrow table are always returned in UTC format
            pandas_series = transformed_timeseries.arrow_table.to_pandas()
            print(pandas_series)

            # lets convert it back to local time zone
            # first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
            pandas_series["utc_time"] = pd.to_datetime(
                pandas_series["utc_time"], utc=True
            ).dt.tz_convert(local_time_zone)
            print(pandas_series)

            # do some further processing

        except Exception as e:
            print(f"Could not read transformed timeseries points: {e}")

        # optionally discard changes
        session.rollback()


if __name__ == "__main__":
    # This will search for a given time series, write some data,
    # read it and convert to pandas format.

    address, tls_root_pem_cert = helpers.get_connection_info()
    main(address, tls_root_pem_cert)