HydSim

The Mesh Python SDK includes experimental functionality to run hydro simulations, run inflow calculations and generate inputs for Marginal Cost using HydSim on the Mesh server. This functionality is under development. Therefore, it is subject to change in future releases. Planned but not yet implemented functionality includes:

  • Selection the resolution to use.

  • Retrieve datasets from the simulation. These are used by Volue for debugging.

The functionality may be used both through a synchronous volue.mesh.Connection.Session and through an asynchronous volue.mesh.aio.Connection.Session using the following methods. Further below you’ll find synchronous and asynchronous examples.

API documentation

abstract Session.run_simulation(model: str, case: str, start_time: datetime, end_time: datetime, *, resolution: Optional[timedelta] = None, scenario: Optional[int] = None, return_datasets: bool = False) Union[Iterator[None], AsyncIterator[None]][source]

Run a hydro simulation using HydSim on the Mesh server.

In case of running a simulation on longer interval and with return_datasets enabled you might get a StatusCode.RESOURCE_EXHAUSTED error. See: gRPC communication.

This function is experimental and subject to larger changes.

Parameters
  • model – The name of the Mesh model in which the simulation case exists.

  • case – The names of the case group and simulation case in the form ‘CaseGroup/CaseName’.

  • start_time – The (inclusive) start of the simulation interval.

  • end_time – The (exclusive) end of the simulation interval.

  • resolution – The resolution of the simulation. The default resolution of the simulation case is used if this is left as None. Officially supported resolutions are 5, 10, 15, and 60 minutes.

  • scenario – The scenario(s) to run. All scenarios are run if left as None, no scenarios are run if set as -1, and a specific numbered scenario is run if set as the number of that scenario.

  • return_datasets – Generate and return HydSim datasets that can be used by Volue to diagnose issues with hydro simulations. For performance reasons this should be false when not trying to diagnose an issue.

Returns

An iterator of None. In future versions this iterator will yield log messages, datasets, and potentially more. The simulation is done when the iterator is exhausted.

Exhausting the iterator without an exception does not guarantee that the simulation completed successfully. To determine that you must analyze the simulation’s result time series and the log messages from the server.

Raises
abstract Session.run_inflow_calculation(model: str, area: str, water_course: str, start_time: datetime, end_time: datetime, *, resolution: Optional[timedelta] = None, return_datasets: bool = False) Union[Iterator[None], AsyncIterator[None]][source]

Run an inflow calculation using HydSim on the Mesh server.

In case of running an inflow calculation on longer interval and with return_datasets enabled you might get a StatusCode.RESOURCE_EXHAUSTED error. See: gRPC communication.

Parameters
  • model – The name of the Mesh model in which the inflow calculation exists.

  • area – The area of the water course to calculate.

  • water_course – The water course to calculate.

  • start_time – The (inclusive) start of the calculation interval.

  • end_time – The (exclusive) end of the calculation interval.

  • resolution – The resolution of the simulation. The default resolution of the inflow calculation case is used if this is left as None. Officially supported resolutions are 5, 10, 15, and 60 minutes.

  • return_datasets – Generate and return HydSim datasets that can be used by Volue to diagnose issues with inflow calculations. For performance reasons this should be false when not trying to diagnose an issue.

Returns

An iterator of None. In future versions this iterator will yield log messages, datasets, and potentially more. The calculation is done when the iterator is exhausted.

Exhausting the iterator without an exception does not guarantee that the calculation completed successfully. To determine that you must analyze the calculation’s result time series and the log messages from the server.

Raises
abstract Session.get_mc_file(model: str, case: str, start_time: datetime, end_time: datetime) Union[Iterator[None], AsyncIterator[None]][source]

Generate Marginal Cost input using HydSim on the Mesh server.

Parameters
  • model – The name of the Mesh model in which the simulation case exists.

  • case – The names of the case group and simulation case in the form ‘CaseGroup/CaseName’.

  • start_time – The (inclusive) start of the simulation interval.

  • end_time – The (exclusive) end of the simulation interval.

Returns

An iterator of LogMessage`s followed by a single `str. The final string is the Marginal Cost input.

Raises

Example of simulations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


async def async_run_simulation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer simulation intervals the gRPC message size
    # may exceed this limit. In such cases the user can set new limit using
    # `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2023, 11, 1)
        end_time = datetime(2023, 11, 2)

        print("running simulation...")

        try:
            async for response in session.run_simulation(
                "Mesh",
                "Cases/Demo",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run simulation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_simulation(address, tls_root_pem_cert)
    asyncio.run(async_run_simulation(address, tls_root_pem_cert))

Example of inflow calculations

import asyncio
import logging
from datetime import datetime, timedelta

import helpers

import volue.mesh.aio
from volue import mesh

GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024  # 10MB


def sync_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


async def async_run_inflow_calculation(address, tls_root_pem_cert):
    print("connecting...")

    # For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
    # connection = mesh.aio.Connection.with_tls(
    #     address,
    #     tls_root_pem_cert,
    #     grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    # )

    # By default the maximum inbound gRPC message size is 4MB. When Mesh server
    # returns datasets for longer inflow calculation intervals the gRPC message
    # size may exceed this limit. In such cases the user can set new limit
    # using `grpc_max_receive_message_length` when creating a connection to Mesh.
    connection = mesh.aio.Connection.insecure(
        address,
        grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
    )

    async with connection.create_session() as session:
        start_time = datetime(2021, 1, 1)
        end_time = datetime(2021, 1, 2)

        print("running inflow calculation...")

        try:
            async for response in session.run_inflow_calculation(
                "Mesh",
                "Area",
                "WaterCourse",
                start_time,
                end_time,
                return_datasets=True,
                resolution=timedelta(minutes=5),
            ):
                if isinstance(response, mesh.LogMessage):
                    print(
                        f"[{logging.getLevelName(response.level)}] {response.message}"
                    )
                elif isinstance(response, mesh.HydSimDataset):
                    print(
                        f"Received dataset {response.name} with {len(response.data)} bytes"
                    )
            print("done")
        except Exception as e:
            print(f"failed to run inflow calculation: {e}")


if __name__ == "__main__":
    address, tls_root_pem_cert = helpers.get_connection_info()
    sync_run_inflow_calculation(address, tls_root_pem_cert)
    asyncio.run(async_run_inflow_calculation(address, tls_root_pem_cert))