Examples¶
This section contains code examples. Most of them require specific Mesh model on the server side. Best practice is to copy the examples scripts that are of interest for you together with helpers.py
Best practices¶
Checkout or open in GitHub git tag corresponding to the Mesh Python SDK version you are using. E.g. for Mesh Python SDK v1.7 it is: https://github.com/Volue-Public/energy-mesh-python/tree/v1.7.0
Copy either the whole examples directory or specific examples script(s) together with helpers.py and paste it to your own workspace.
Run example:
python .\examples\get_version.py localhost:50051 c:\certificate.pem
First argument - Mesh server address with port. Default value is localhost:50051.
Second argument - path to PEM-encoded TLS certificate used by Mesh server. Skip it if the Mesh server is configured to accept insecure gRPC connections. If provided, then make sure that instead of
volue.mesh.Connection.Session.insecure()
, thevolue.mesh.Connection.Session.with_tls()
is used to establish connection to Mesh.
Note
Starting from Mesh Python SDK 1.9, in all examples the connection to Mesh
server is established using volue.mesh.Connection.Session.insecure()
.
To use a different connection type, e.g.: with TLS, the user has to change
the example script. The PEM-encoded TLS certificate passed as a second
argument will be discarded if an insecure connection is used.
Quickstart¶
import helpers
from volue.mesh import Connection
def main(address, tls_root_pem_cert):
"""Showing the quickest way to get started."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
# Which version is the server running
version_info = connection.get_version()
print(f"Connected to {version_info.name} {version_info.version}")
# Create a remote session on the Volue Mesh server
session = connection.create_session()
session.open()
print("You have now an open session and can request time series")
# Close the remote session
session.close()
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Authorization¶
import asyncio
import helpers
import volue.mesh.aio
from volue import mesh
def sync_auth(address, tls_root_pem_cert, service_principal, user_principal):
print("Synchronous authentication example: ")
connection = mesh.Connection.with_kerberos(
address, tls_root_pem_cert, service_principal, user_principal
)
user_identity = connection.get_user_identity()
print(user_identity)
# revoke no longer used token
connection.revoke_access_token()
async def async_auth(address, tls_root_pem_cert, service_principal, user_principal):
print("Asynchronous authentication example:")
connection = mesh.aio.Connection.with_kerberos(
address, tls_root_pem_cert, service_principal, user_principal
)
user_identity = await connection.get_user_identity()
print(user_identity)
# revoke no longer used token
await connection.revoke_access_token()
def main(address, tls_root_pem_cert):
"""Showing how to authorize to gRPC Mesh server."""
# If Mesh gRPC server is running as a service user,
# for example LocalSystem, NetworkService or a user account
# with a registered service principal name then it is enough
# to provide hostname as service principal, e.g.:
# 'HOST/hostname.companyad.company.com'
# If Mesh gRPC server is running as a user account without
# registered service principal name then it is enough to provide
# user account name running Mesh server as service principal, e.g.:
# 'ad\\user.name' or r'ad\user.name'
# Note: winkerberos converts service principal name if provided in
# RFC-2078 format. '@' is converted to '/' if there is no '/'
# character in the service principal name. E.g.:
# service@hostname
# Would be converted to:
# service/hostname
service_principal = "HOST/hostname.companyad.company.com"
user_principal = None
sync_auth(address, tls_root_pem_cert, service_principal, user_principal)
asyncio.run(
async_auth(address, tls_root_pem_cert, service_principal, user_principal)
)
if __name__ == "__main__":
# This will authenticate Python client (receive authorization token from Mesh),
# then send gRPC request that requires authorization (e.g.: GetUserIdentity)
# and print the result. If your user name info is printed, you have successfully
# communicated with the server.
#
# This requires Mesh server to be running with enabled TLS and Kerberos options.
address, tls_root_pem_cert = helpers.get_connection_info()
# main(address, tls_root_pem_cert)
Connect¶
import helpers
from volue.mesh import Connection
def get_version(connection):
"""Showing how to send get the server version."""
print("1. Requesting server version")
version = connection.get_version()
print(f"2. Server version is {version.version}")
def start_and_end_session(session):
"""Showing how to start and end a session."""
print("A. Starting session")
session.open()
print("B. Ending session")
session.close()
def main(address, tls_root_pem_cert):
"""Showing how to connect to a server and run two tasks sequentially."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
get_version(connection)
start_and_end_session(connection.create_session())
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
print("Done")
# Outputs:
# 1. Requesting server version
# 2. Server version is 1.12.5.0-dev
# A. Starting session
# B. Ending session
# Done
Connect, asynchronously¶
import asyncio
import helpers
from volue.mesh.aio import Connection
async def get_version(connection):
"""Showing how to get the server version."""
print("1. Requesting server version")
version = await connection.get_version()
print(f"2. Server version is {version.version}")
async def start_and_end_session(session):
"""Showing how to start and end a session."""
print("A. Starting session")
await session.open()
print("B. Ending session")
await session.close()
async def main(address, tls_root_pem_cert):
"""Showing how to connect to a server and run two tasks concurrently."""
# Creating a connection, but not sending any requests yet.
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
# Indicate that these two functions can be run concurrently.
await asyncio.gather(
get_version(connection), start_and_end_session(connection.create_session())
)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
asyncio.run(main(address, tls_root_pem_cert))
print("Done")
# Outputs:
# 1. Requesting server version
# A. Starting session
# 2. Server version is 1.12.5.0-dev
# B. Ending session
# Done
Connect using external access token¶
from datetime import datetime
import helpers
from volue.mesh import Connection
def main(address, tls_root_pem_cert):
"""
Showing how to authorize to gRPC Mesh server using externally obtained
access token, e.g: a OAuth JWT. Obtaining the access token is out of scope
for this example.
Depending on your environment, e.g.: Azure AD, using libraries like
Microsoft Authentication Library (MSAL) for getting the tokens is
suggested.
"""
token = "my_token"
connection = Connection.with_external_access_token(
address, tls_root_pem_cert, access_token=token
)
with connection.create_session() as session:
# Print user information contained in the access token.
user_identity = connection.get_user_identity()
print(user_identity)
# Read some time series data.
# This requires the user has time series read permissions.
timeseries_key = 1388
timeseries = session.read_timeseries_points(
timeseries_key, datetime(2014, 1, 1), datetime(2015, 1, 1)
)
print(timeseries.arrow_table.to_pandas())
# For long running sessions it may be necessary to refresh the access
# token.
# Other possibility would be to catch grpc.RpcError with status code
# UNAUTHENTICATED and then get new access token and update it in the
# Mesh connection using `update_external_access_token`.
connection.update_external_access_token("my_new_access_token")
if __name__ == "__main__":
# This requires Mesh server to be running with enabled TLS and OAuth options.
# Obtaining access token is out of the scope for this example.
address, tls_root_pem_cert = helpers.get_connection_info()
# main(address, tls_root_pem_cert)
Get version¶
import asyncio
import helpers
import volue.mesh.aio
from volue import mesh
def sync_get_version(address, tls_root_pem_cert):
print("Synchronous get version:")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.Connection.insecure(address)
version_info = connection.get_version()
print(version_info.version)
async def async_get_version(address, tls_root_pem_cert):
print("Asynchronous get version:")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.aio.Connection.insecure(address)
version_info = await connection.get_version()
print(version_info.version)
if __name__ == "__main__":
# This will request and print version info from the mesh server.
# If some sensible version info is printed, you have successfully
# communicated with the server.
address, tls_root_pem_cert = helpers.get_connection_info()
sync_get_version(address, tls_root_pem_cert)
asyncio.run(async_get_version(address, tls_root_pem_cert))
Read time series¶
import uuid
from datetime import datetime
import helpers
from volue.mesh import Connection
def read_timeseries_points(session: Connection.Session):
"""Showing how to read time series points."""
# Define the time series identifier, it can be:
# - time series key of a physical time series
# - path of a time series attribute that is connected to a physical time series
# - ID of a time series attribute that is connected to a physical time series
timeseries_key = 3
timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")
# Defining a time interval to read time series from.
# If no time zone is provided then it will be treated as UTC.
start = datetime(2016, 1, 1, 6, 0, 0)
end = datetime(2016, 1, 1, 8, 0, 0)
# Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
# In case of larger data volumes please send request data in chunks.
# E.g.: call multiple times `read_timeseries_points` with shorter interval.
# Send request to read time series based on time series key.
timeseries = session.read_timeseries_points(
target=timeseries_key, start_time=start, end_time=end
)
print(f"Read {timeseries.number_of_points} points using time series key.")
print(timeseries.arrow_table.to_pandas())
# Send requests to read time series based on time series attribute path.
timeseries = session.read_timeseries_points(
target=timeseries_attribute_path, start_time=start, end_time=end
)
print(
f"Read {timeseries.number_of_points} points using time series attribute path."
)
print(timeseries.arrow_table.to_pandas())
# Send requests to read time series based on time series attribute ID.
# Attribute IDs are auto-generated when an object is created.
# That is why we can't use any fixed ID in this example and the code is commented out.
# timeseries = session.read_timeseries_points(
# target=timeseries_attribute_id, start_time=start, end_time=end
# )
# print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
# print(timeseries.arrow_table.to_pandas())
def main(address, tls_root_pem_cert):
"""Showing how to get time series points."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
read_timeseries_points(session)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Read time series, asynchronously¶
import asyncio
import uuid
from datetime import datetime
import helpers
from volue.mesh.aio import Connection
async def read_timeseries_points_async(session: Connection.Session):
"""Showing how to read time series points."""
# Define the time series identifier, it can be:
# - time series key of a physical time series
# - path of a time series attribute that is connected to a physical time series
# - ID of a time series attribute that is connected to a physical time series
timeseries_key = 3
timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")
# Defining a time interval to read time series from.
# If no time zone is provided then it will be treated as UTC.
start = datetime(2016, 1, 1, 6, 0, 0)
end = datetime(2016, 1, 1, 8, 0, 0)
# Each time series point occupies 20 bytes. By default gRPC has a limitation of 4MB inbound message size.
# In case of larger data volumes please send request data in chunks.
# E.g.: call multiple times `read_timeseries_points` with shorter interval.
# Send request to read time series based on time series key.
timeseries = await session.read_timeseries_points(
target=timeseries_key, start_time=start, end_time=end
)
print(f"Read {timeseries.number_of_points} points using time series key.")
print(timeseries.arrow_table.to_pandas())
# Send requests to read time series based on time series attribute path.
timeseries = await session.read_timeseries_points(
target=timeseries_attribute_path, start_time=start, end_time=end
)
print(
f"Read {timeseries.number_of_points} points using time series attribute path."
)
print(timeseries.arrow_table.to_pandas())
# Send requests to read time series based on time series attribute ID.
# Attribute IDs are auto-generated when an object is created.
# That is why we can't use any fixed ID in this example and the code is commented out.
# timeseries = await session.read_timeseries_points(
# target=timeseries_attribute_id, start_time=start, end_time=end
# )
# print(f"Read {timeseries.number_of_points} points using time series attribute ID.")
# print(timeseries.arrow_table.to_pandas())
async def main(address, tls_root_pem_cert):
"""Showing how to get time series points asynchronously."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
async with connection.create_session() as session:
await read_timeseries_points_async(session)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
asyncio.run(main(address, tls_root_pem_cert))
Read and process time series, asynchronously¶
import asyncio
from datetime import datetime
import grpc
import helpers
from volue.mesh.aio import Connection
async def process_timeseries_values(arrow_table):
# the processing can also be an async IO call
# to e.g. separate service
await asyncio.sleep(8)
print(f"Processing completed - {len(arrow_table)} points were processed")
async def read_timeseries_points(session, path):
start_time = datetime(2016, 5, 1)
end_time = datetime(2016, 5, 4)
# read operation can be a long running operation
# with asyncio API we can switch to do something else
# while waiting for the read operation to complete
# (e.g. doing processing for already returned time series)
timeseries_read = await session.read_timeseries_points(
target=path, start_time=start_time, end_time=end_time
)
return timeseries_read.arrow_table
async def handle_timeseries(session, path):
arrow_table = await read_timeseries_points(session, path)
await process_timeseries_values(arrow_table)
async def main(address, tls_root_pem_cert):
"""
Showing how to use asynchronous connection in a real-world scenario.
First multiple time series are returned that match a given query.
Then each time series is read and some processing is applied.
Note: Mesh does not yet handle parallel requests, every request is handled
sequentially.
This example works with `SimpleThermalTestModel` test model, where 2 time
series are returned for the given query. Assume reading time series takes
10 seconds and processing (or some different computation, e.g.
neural network inference based on that input) takes 8 seconds.
When waiting for the read operation to complete for the second time series
we can already start processing the first time series. See:
Tr = 10s (reading)
Tp = 8s (processing)
Asynchronous code (with Mesh handling requests sequentially):
| Tr1 || Tp1 |
------------------
| Tr2 || Tp2 |
------------------
---------- 28s -------------
Whereas for synchronous code:
| Tr1 || Tp1 |
------------------
| Tr2 || Tp2 |
------------------
--------------- 36s ---------------
For 2 time series we save ~22% time.
For 10 time series it would be 40% time (108s instead of 180s).
Note: the processing could be also done using async IO
(e.g. requests send to different service).
"""
query = "*.TsRawAtt"
start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
async with connection.create_session() as session:
try:
timeseries_attributes = await session.search_for_timeseries_attributes(
target=start_object_path, query=query
)
except grpc.RpcError as e:
print(f"Could not find timeseries attribute: {e}")
return
print(f"Number of found time series: {len(timeseries_attributes)}")
await asyncio.gather(
*(
handle_timeseries(session, timeseries_attribute.path)
for timeseries_attribute in timeseries_attributes
)
)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
asyncio.run(main(address, tls_root_pem_cert))
Search for time series attributes¶
import helpers
from volue.mesh import Connection, OwnershipRelationAttribute, TimeseriesAttribute
def search_method_1(session: Connection.Session):
"""
This method uses `search_for_timeseries_attributes` function.
Wildcard search expression for attributes is not supported, meaning we
can't get all time series attributes using simply "*" search expression.
Useful e.g.: when searching for attributes with known names.
"""
print("Search method 1")
# Specify what you want to search for
# The Mesh object to start searching from
start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
# OR
# start_object_guid = uuid.UUID("0000000b-0001-0000-0000-000000000000") # ThermalComponent
# The query expressed using Mesh search language syntax
# Traverse all children (*) of the start object, not case-sensitive ({}) and
# accept all that has an attribute (.) called TsRawAtt
query = "{*}.TsRawAtt"
# Search for time series attributes using this query
timeseries_attributes = session.search_for_timeseries_attributes(
target=start_object_path, query=query
)
print(f"Number of found time series: {len(timeseries_attributes)}")
for attribute in timeseries_attributes:
if isinstance(attribute, TimeseriesAttribute):
print(attribute.path)
def search_method_2(session: Connection.Session):
"""
This method uses `search_for_objects` function with wildcard search
expression ("*") that returns all objects.
Useful when you want to traverse the complete model.
Additionally here we show how to distinguish if time series attribute has
a time series calculation expression defined or a physical time series
connected to it.
"""
print("Search method 2")
# Provide root object/model as the start object to run the search.
# Root objects/models do not contain any attributes.
start_object_path = "Model/SimpleThermalTestModel"
# Returns every object.
query = "*"
# Depending on the model size this may take long time to complete.
objects = session.search_for_objects(target=start_object_path, query=query)
print(f"Number of found objects: {len(objects)}")
for object in objects:
print(object.name)
for attribute in object.attributes.values():
if isinstance(attribute, TimeseriesAttribute):
# If time series resource is set, then it means a physical
# time series is connected to the attribute.
if attribute.time_series_resource is not None:
print(
f"{attribute.path} has physical time series connected with time series key {attribute.time_series_resource.timeseries_key}"
)
def search_method_3(session: Connection.Session):
"""
This method uses `get_object` function.
Useful when you want to traverse a subset of the model from a given node,
but not root object/model. Models do not contain attributes, so you won't
be able to traverse the child objects using ownership attributes.
"""
print("Search method 3")
def traverse_child_objects(session: Connection.Session, target):
object = session.get_object(target, full_attribute_info=True)
for attribute in object.attributes.values():
if isinstance(attribute, OwnershipRelationAttribute):
for child_id in attribute.target_object_ids:
traverse_child_objects(session, child_id)
if isinstance(attribute, TimeseriesAttribute):
print(attribute.path)
object_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1"
traverse_child_objects(session, object_path)
def main(address, tls_root_pem_cert):
"""Showing how to search for Mesh time series attributes in various ways."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
# Create a remote session on the Volue Mesh server.
with connection.create_session() as session:
search_method_1(session)
search_method_2(session)
search_method_3(session)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Traverse model¶
import helpers
from volue.mesh import Connection, OwnershipRelationAttribute
leaves = []
def traverse_model_top_down(session: Connection.Session, target, depth=0):
"""Traverses the Mesh model recursively."""
object = session.get_object(target)
print(f"{'..' * depth}{object.name}")
leaf = True
for attr in object.attributes.values():
if isinstance(attr, OwnershipRelationAttribute):
for child_id in attr.target_object_ids:
leaf = False
traverse_model_top_down(session, child_id, depth + 1)
if leaf:
leaves.append(object)
def traverse_model_bottom_up(session: Connection.Session, target, model):
object = session.get_object(target)
depth = object.path.count("/") - 1
print(f"{'..' * depth}{object.name}")
if object.owner_id == model.id:
print(model.name)
return
attribute = session.get_attribute(object.owner_id)
traverse_model_bottom_up(session, attribute.owner_id, model)
def main(address, tls_root_pem_cert):
"""Showing how to traverse Mesh model."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
models = session.list_models()
for model in models:
leaves.clear()
print(f"\nModel: '{model.name}'")
print("Top-bottom traversal:")
traverse_model_top_down(session, model.id)
# Excepted output:
# Model
# ..ChildObject1
# ....SubChildObject1
# ....SubChildObject2
# ..ChildObject2
print("\nBottom-top traversal:")
traverse_model_bottom_up(session, leaves[0].id, model)
# Excepted output:
# ....SubChildObject1
# ..ChildObject1
# Model
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Using time series with pandas¶
from datetime import datetime
import grpc
import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz
from volue.mesh import Connection, Timeseries
from volue.mesh.calc import transform as Transform
from volue.mesh.calc.common import Timezone
def main(address, tls_root_pem_cert):
"""Showing how to find time series, write, read points from it and convert them to pandas format."""
query = "*[.Name=SomePowerPlantChimney2].TsRawAtt" # make sure only 1 time series is returned
start_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
# first lets find a time series in our model
try:
timeseries_attributes = session.search_for_timeseries_attributes(
start_object_path, query
)
except grpc.RpcError as e:
print(f"Could not find time series attribute: {e}")
return
if len(timeseries_attributes) == 0:
print("No such time series attribute in the given model/database")
return
print(f"Number of found time series: {len(timeseries_attributes)}")
# pick the first time series and do some operations with it
timeseries_attribute = timeseries_attributes[0]
print("Working on timeseries with path: " + timeseries_attribute.path)
# check for example the curve type of the connected physical time series
print(f"Curve: {timeseries_attribute.time_series_resource.curve_type}")
# now lets write some data to it
try:
# Mesh data is organized as an Arrow table with the following schema:
# utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
# flags - [pa.uint32]
# value - [pa.float64]
number_of_points = 72
timestamps = []
values = []
for i in range(0, number_of_points):
hours = i % 24
days = int(i / 24) + 1
timestamps.append(
datetime(2016, 5, days, hours)
) # if no time zone is provided then the timestamp is treated as UTC
values.append(days * 10)
flags = [Timeseries.PointFlags.OK.value] * number_of_points
arrays = [pa.array(timestamps), pa.array(flags), pa.array(values)]
arrow_table = pa.Table.from_arrays(arrays, schema=Timeseries.schema)
timeseries = Timeseries(
table=arrow_table, full_name=timeseries_attribute.path
)
session.write_timeseries_points(timeseries)
except grpc.RpcError as e:
print(f"Could not write timeseries points: {e}")
local_time_zone = tz.tzlocal()
# now lets read from it
try:
# lets use local time zone (read from operating system settings)
start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)
timeseries_read = session.read_timeseries_points(
target=timeseries_attribute, start_time=start_time, end_time=end_time
)
# convert to pandas format
# the timestamps in PyArrow table are always returned in UTC format
pandas_series = timeseries_read.arrow_table.to_pandas()
# lets convert it back to local time zone
# first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
pandas_series["utc_time"] = pd.to_datetime(
pandas_series["utc_time"], utc=True
).dt.tz_convert(local_time_zone)
print(pandas_series)
# notice that depending on the local time zone there is a shift in the data
# e.g. for UTC+2 time zone, first 2 values will be NaN, because writing time series points in the previous step
# is using time zone naive datetime object, so they are treated as UTC.
# do some further processing
except grpc.RpcError as e:
print(f"Could not read timeseries points: {e}")
# now lets read transformations from it (transform to days)
print("Transform resolution to days:")
try:
start_time = datetime(2016, 5, 1, tzinfo=local_time_zone)
end_time = datetime(2016, 5, 4, tzinfo=local_time_zone)
# Transform function may take optionally a time zone argument.
# Refer to `transform` documentation for more details.
# If you are using `LOCAL` or `STANDARD` time zone then make sure
# the Mesh server is operating in the same time zone or adjust properly.
transformed_timeseries = session.transform_functions(
timeseries_attribute, start_time, end_time
).transform(Timeseries.Resolution.DAY, Transform.Method.SUM, Timezone.LOCAL)
# convert to pandas format
# the timestamps in PyArrow table are always returned in UTC format
pandas_series = transformed_timeseries.arrow_table.to_pandas()
print(pandas_series)
# lets convert it back to local time zone
# first convert to UTC time zone aware datetime object and then to local time zone (set in operating system)
pandas_series["utc_time"] = pd.to_datetime(
pandas_series["utc_time"], utc=True
).dt.tz_convert(local_time_zone)
print(pandas_series)
# do some further processing
except Exception as e:
print(f"Could not read transformed timeseries points: {e}")
# optionally discard changes
session.rollback()
if __name__ == "__main__":
# This will search for a given time series, write some data,
# read it and convert to pandas format.
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Working with link relations¶
from datetime import datetime, timedelta
import helpers
from dateutil import tz
from volue.mesh import Connection, LinkRelationVersion, VersionedLinkRelationAttribute
OBJECT_PATH = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1"
ONE_TO_ONE_PATH = OBJECT_PATH + ".SimpleReference"
ONE_TO_MANY_PATH = OBJECT_PATH + ".PlantToChimneyRefCollection"
VERSIONED_ONE_TO_ONE_PATH = OBJECT_PATH + ".ReferenceSeriesAtt"
VERSIONED_ONE_TO_MANY_PATH = OBJECT_PATH + ".ReferenceSeriesCollectionAtt"
LOCAL_TIME_ZONE = tz.tzlocal()
def one_to_one_link_relation_example(session: Connection.Session):
attribute_path = ONE_TO_ONE_PATH
print("\nOne-to-one link relation")
print("------------------------")
# First read the attribute using `get_attribute`.
# Full attribute information is needed to get target object type name.
attribute = session.get_attribute(attribute_path, full_attribute_info=True)
print(
f"\tLink relation attribute points to {attribute.target_object_ids} of type {attribute.definition.target_object_type_name}"
)
# Get more information on the target object the link relation points to.
# The link relation can be potentially nullable,check if there is any
# target object the link relation points to.
if len(attribute.target_object_ids) > 0:
target_object = session.get_object(attribute.target_object_ids[0])
print(f"\tTarget object path is: {target_object.path}")
# Change the target object.
new_target_object_path = OBJECT_PATH + ".PlantToChimneyRef/SomePowerPlantChimney1"
new_target_object = session.get_object(new_target_object_path)
session.update_link_relation_attribute(attribute, [new_target_object.id])
# Read the updated attribute.
attribute = session.get_attribute(attribute_path, full_attribute_info=True)
print(f"\tUpdated link relation attribute points to {attribute.target_object_ids}")
def one_to_many_link_relation_example(session: Connection.Session):
attribute_path = ONE_TO_MANY_PATH
print("\nOne-to-many link relation")
print("-------------------------")
# First read the attribute using `get_attribute`.
# Full attribute information is needed to get target object type name.
attribute = session.get_attribute(attribute_path, full_attribute_info=True)
print(
f"\tLink relation attribute points to {attribute.target_object_ids} of type {attribute.definition.target_object_type_name}"
)
# Get more information on the target object the link relation points to.
# The link relation can be potentially nullable,check if there is any
# target object the link relation points to.
print("\tTarget object paths:")
for index, target_object_id in enumerate(attribute.target_object_ids, 1):
target_object = session.get_object(target_object_id)
print(f"\t{index}. {target_object.path}")
# Remove all target objects.
session.update_link_relation_attribute(attribute, [])
# Read the updated attribute.
attribute = session.get_attribute(attribute_path, full_attribute_info=True)
print(f"\tUpdated link relation attribute points to {attribute.target_object_ids}")
# Find target object for the new version.
objects = session.search_for_objects(
"Model/SimpleThermalTestModel",
query=f"*[.Type={attribute.definition.target_object_type_name}&&.Name=SomePowerPlantChimney1]",
)
if len(objects) != 1:
raise RuntimeError(
f"invalid result from 'search_for_objects', "
f"expected 1 target object, but got {len(objects)}"
)
new_target_object_1 = objects[0]
# Set a new target object.
# Updating one-to-many link relation without setting `append` flag will
# replace all already existing target objects with the new ones.
session.update_link_relation_attribute(attribute, [new_target_object_1.id])
# Add a new target object.
# Now use `append` flag to preserve already existing target objects.
new_target_object_2_path = OBJECT_PATH + ".PlantToChimneyRef/SomePowerPlantChimney2"
new_target_object_2 = session.get_object(new_target_object_2_path)
session.update_link_relation_attribute(
attribute, [new_target_object_2.id], append=True
)
# Read the updated attribute.
attribute = session.get_attribute(attribute_path, full_attribute_info=True)
print(f"\tUpdated link relation attribute points to {attribute.target_object_ids}")
def get_versioned_link_relation_attribute_information(
attribute: VersionedLinkRelationAttribute, session: Connection.Session
):
"""Create a printable message from a versioned link relation attribute."""
message = ""
for entry_index, entry in enumerate(attribute.entries, 1):
message += f"Entry {entry_index}\n"
for version_index, version in enumerate(entry.versions, 1):
if version.target_object_id:
target_object = session.get_object(version.target_object_id)
target_object_name = target_object.name
else:
target_object_name = "<EMPTY>"
valid_from_time_str = ""
# If running on Windows and the datetime is before epoch
# (1.1.1970 UTC) then we can't use `astimezone` because Windows
# `localtime()` does not support it, see discussion:
# https://bugs.python.org/issue31327
if version.valid_from_time < datetime(1970, 1, 1, tzinfo=tz.UTC):
# If the datetime is before 1.1.1970 UTC then print the time
# zone info.
valid_from_time_str = f"{version.valid_from_time:%Y-%m-%dT%H:%M:%S %Z}"
else:
# Otherwise convert to local time zone
valid_from_time_str = (
f"{version.valid_from_time.astimezone(LOCAL_TIME_ZONE)}"
)
message += (
f"\tVersion {version_index}. "
f"target object name: {target_object_name}, "
f"valid from time: {valid_from_time_str}\n"
)
return message
def versioned_one_to_one_link_relation_example(session: Connection.Session):
attribute_path = VERSIONED_ONE_TO_ONE_PATH
print("\nVersioned one-to-one link relation")
print("----------------------------------")
attribute = session.get_attribute(attribute_path)
print(get_versioned_link_relation_attribute_information(attribute, session))
# Remove the first version in entry.
if len(attribute.entries) > 0 and len(attribute.entries[0].versions) > 0:
session.update_versioned_one_to_one_link_relation_attribute(
attribute_path,
start_time=datetime.min,
# Replacement interval end time is exclusive,
# i.e.: [start_time, end_time).
# That is why we need to add some small time fraction to make
# sure the last version's `valid_from_time` is within the
# replacement interval.
end_time=attribute.entries[0].versions[0].valid_from_time
+ timedelta(microseconds=1),
new_versions=[],
)
# Add a new version in entry.
new_target_object_path = OBJECT_PATH + ".PlantToChimneyRef/SomePowerPlantChimney1"
new_target_object = session.get_object(new_target_object_path)
new_link_relation_version_1 = LinkRelationVersion(
target_object_id=new_target_object.id,
# If no time zone is provided then it will be treated as UTC.
valid_from_time=datetime(2022, 1, 1, tzinfo=LOCAL_TIME_ZONE),
)
# Add another one, this this with empty target object.
new_link_relation_version_2 = LinkRelationVersion(
target_object_id=None,
# If no time zone is provided then it will be treated as UTC.
valid_from_time=datetime(2025, 1, 1, tzinfo=LOCAL_TIME_ZONE),
)
session.update_versioned_one_to_one_link_relation_attribute(
attribute_path,
start_time=new_link_relation_version_1.valid_from_time,
end_time=datetime.max,
new_versions=[new_link_relation_version_1, new_link_relation_version_2],
)
# Read the updated attribute.
attribute = session.get_attribute(attribute_path)
print("Updated link relation attribute:")
print(get_versioned_link_relation_attribute_information(attribute, session))
def versioned_one_to_many_link_relation_example(session: Connection.Session):
attribute_path = VERSIONED_ONE_TO_MANY_PATH
print("\nVersioned one-to-many link relation")
print("-----------------------------------")
attribute = session.get_attribute(attribute_path)
print(get_versioned_link_relation_attribute_information(attribute, session))
new_target_object_1_path = OBJECT_PATH + ".PlantToChimneyRef/SomePowerPlantChimney1"
new_target_object_1 = session.get_object(new_target_object_1_path)
new_target_object_2_path = OBJECT_PATH + ".PlantToChimneyRef/SomePowerPlantChimney2"
new_target_object_2 = session.get_object(new_target_object_2_path)
entry_1_version_1 = LinkRelationVersion(
target_object_id=new_target_object_1.id,
# If no time zone is provided then it will be treated as UTC.
valid_from_time=datetime(2022, 1, 1, tzinfo=LOCAL_TIME_ZONE),
)
entry_1_version_2 = LinkRelationVersion(
target_object_id=None,
# If no time zone is provided then it will be treated as UTC.
valid_from_time=datetime(2025, 1, 1, tzinfo=LOCAL_TIME_ZONE),
)
entry_1_version_3 = LinkRelationVersion(
target_object_id=new_target_object_1.id,
# If no time zone is provided then it will be treated as UTC.
valid_from_time=datetime(2027, 1, 1, tzinfo=LOCAL_TIME_ZONE),
)
entry1 = [entry_1_version_1, entry_1_version_2, entry_1_version_3]
entry_2_version_1 = LinkRelationVersion(
target_object_id=new_target_object_2.id,
valid_from_time=datetime(2000, 1, 1),
)
entry_2_version_2 = LinkRelationVersion(
target_object_id=None,
valid_from_time=datetime(2010, 1, 1),
)
entry2 = [entry_2_version_1, entry_2_version_2]
new_entries = [entry1, entry2]
session.update_versioned_one_to_many_link_relation_attribute(
attribute_path, new_entries
)
# Read the updated attribute.
attribute = session.get_attribute(attribute_path)
print("Updated link relation attribute:")
print(get_versioned_link_relation_attribute_information(attribute, session))
def main(address, tls_root_pem_cert):
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
one_to_one_link_relation_example(session)
one_to_many_link_relation_example(session)
versioned_one_to_one_link_relation_example(session)
versioned_one_to_many_link_relation_example(session)
# to commit your changes you need to call:
# session.commit()
if __name__ == "__main__":
args = helpers.get_connection_info()
main(*args)
Working with model (objects and attributes)¶
import helpers
from volue.mesh import AttributesFilter, Connection, OwnershipRelationAttribute
def main(address, tls_root_pem_cert):
root_object_path = "Model/SimpleThermalTestModel/ThermalComponent"
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
# Root object has an ownership relation attribute that point to objects
# of "PlantElementType" type. We want to add new object of this type.
root_object = session.get_object(root_object_path, full_attribute_info=True)
# First get the ownership attribute:
# If we know the name of the attribute we can use:
ownership_relation_attribute = root_object.attributes["ThermalPowerToPlantRef"]
# If we don't know the name, but only the type of the target object
# (the object it points to):
# Note: This requires `full_attribute_info` flag set to True when
# calling `get_object` or `get_attribute`.
for attribute in root_object.attributes.values():
if (
isinstance(attribute, OwnershipRelationAttribute)
and attribute.definition.target_object_type_name == "PlantElementType"
):
ownership_relation_attribute = attribute
new_object = session.create_object(
ownership_relation_attribute, "SomeNewPowerPlant"
)
print(f"New object created: {new_object.path}")
int_attribute = new_object.attributes["Int64Att"]
# Object returned by `create_object` contains all attributes with basic
# information.
print("One of the new object's attributes (basic information):")
print(int_attribute)
# Now let's update attribute's value.
session.update_simple_attribute(int_attribute, 100)
# Check updated value, but this time get also more information like
# attribute definition. Use either:
# - `get_object` with `full_attribute_info` flag
# - `get_attribute` with `full_attribute_info` flag
attribute_with_full_info = session.get_attribute(
int_attribute, full_attribute_info=True
)
print("One of the new object's attributes (full information):")
print(attribute_with_full_info)
# We can also filter attributes by e.g. name.
object_with_filtered_attributes = session.get_object(
new_object,
attributes_filter=AttributesFilter(name_mask=["DblAtt", "StringAtt"]),
)
print(
f"Filtered attributes count: {len(object_with_filtered_attributes.attributes)}"
)
# Now lets change the object name.
session.update_object(new_object, new_name="NewNamePowerPlant")
print("Object's name changed.")
# Delete the updated object.
# Because we changed object's name, we can't provide old `Object`
# instance as it is still having the old object name.
# Use object's ID instead or provide the path to the object explicitly.
session.delete_object(new_object.id)
print("Object deleted.")
# To commit your changes you need to call:
# session.commit()
if __name__ == "__main__":
args = helpers.get_connection_info()
main(*args)
Working with rating curves¶
from datetime import datetime
import helpers
from volue.mesh import Connection, RatingCurveSegment, RatingCurveVersion
def main(address, tls_root_pem_cert):
rating_curve_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1.RatingCurveAtt"
# Defining a time interval to read rating curve versions from.
# If no time zone is provided then it will be treated as UTC.
start_time = datetime(2008, 1, 1)
end_time = datetime(2022, 1, 1)
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
# First read the attribute using `get_attribute`.
# We can get standard information like name, ID, tags, etc.
rating_curve_attribute = session.get_attribute(
rating_curve_attribute_path, full_attribute_info=True
)
print(
f"Basic information about the rating curve attribute: {rating_curve_attribute}\n"
)
# Because the rating curve can potentially contain large amounts of data,
# specialized methods exist to handle those values.
versions = session.get_rating_curve_versions(
target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
)
print(
(
"Rating curve versions for time interval: "
f"{start_time.strftime('%d.%m.%Y')} - {end_time.strftime('%d.%m.%Y')}:"
)
)
for i, version in enumerate(versions):
print(f"Version {i+1}:\n{version}")
if len(versions) == 0 or len(versions[-1].x_value_segments) == 0:
print("No rating curve versions found. Skip update.")
return
# Now for the last version update first segment and add a new one.
versions[-1].x_value_segments[0].factor_b = -3.2
versions[-1].x_value_segments.append(RatingCurveSegment(25, -1.1, 2.2, -3.3))
session.update_rating_curve_versions(
target=rating_curve_attribute_path,
start_time=versions[
0
].valid_from_time, # use first version `valid_from_time` as start interval
end_time=datetime.max, # this is the last version
new_versions=versions,
)
# Read once again.
versions = session.get_rating_curve_versions(
target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
)
print("Updated rating curve versions:")
for i, version in enumerate(versions):
print(f"Version {i+1}:\n{version}")
# Now lets replace last version with new one
new_segments = [
RatingCurveSegment(10, 1.1, 1.9, -3.5),
RatingCurveSegment(21, 2.1, 1.8, -3.2),
]
new_version = RatingCurveVersion(
valid_from_time=datetime(2010, 6, 1),
x_range_from=5.0,
x_value_segments=new_segments,
)
session.update_rating_curve_versions(
target=rating_curve_attribute_path,
start_time=versions[
-1
].valid_from_time, # to replace old version use its `valid_from_time` as start interval
end_time=datetime.max, # this is the last version
new_versions=[new_version],
)
# Read once again.
versions = session.get_rating_curve_versions(
target=rating_curve_attribute_path, start_time=start_time, end_time=end_time
)
print("Updated for the second time rating curve versions:")
for i, version in enumerate(versions):
print(f"Version {i+1}:\n{version}")
# to commit your changes you need to call:
# session.commit()
if __name__ == "__main__":
args = helpers.get_connection_info()
main(*args)
Working with sessions¶
import uuid
import helpers
from volue.mesh import Connection
def main(address, tls_root_pem_cert):
"""Showing different ways of working with sessions."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
# 1. Create a session, open and close session.
session = connection.create_session()
session.open()
print("1. You now have a new open session")
session.close()
# 2. Create session using the with statement.
# Session will be created, opened and closed within the 'with' statement scope
with connection.create_session() as session:
print("2. You now have a new open session")
# 3. Connecting to a potentially open session.
# Session ids can be found in the session object:
# session.session_id
# Note: If the session_id is not the id of an open session,
# the server will create a new one for you.
# Set the session id you want to connect to
session_id = uuid.UUID("123e4567-e89b-12d3-a456-556642440000")
print(f"3. Session id you want to connect to: {session_id}")
session = connection.connect_to_session(session_id)
# Try connecting to that session id, if it does not exist, a new one will be created without warning
session.open()
print(
"3. You have now an open session, either the one you requested or a new one if it did not exist"
)
# Check which session you are connected to and close it
print(f"3. Session id you are to connect to: {session.session_id}")
session.close()
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Working with XY sets¶
import datetime
import helpers
from volue import mesh
OBJECT_PATH = "Model/SimpleThermalTestModel/ThermalComponent.ThermalPowerToPlantRef/SomePowerPlant1"
UNVERSIONED_PATH = OBJECT_PATH + ".XYSetAtt"
VERSIONED_PATH = OBJECT_PATH + ".XYZSeriesAtt"
def main(address, tls_root_pem_cert):
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.Connection.insecure(address)
with connection.create_session() as session:
# In the default test model both the versioned and the unversioned
# attribute are initially empty. The following two calls are therefore
# expected to return [].
print(f"Getting XY-sets from {UNVERSIONED_PATH}")
xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
print(xy_sets)
start_time = datetime.datetime.fromisoformat("1960-01-01")
end_time = datetime.datetime.fromisoformat("2030-01-01")
print(
f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
)
xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
print(f"Received: {xy_sets}")
sample_xy_set = mesh.XySet(
None, [mesh.XyCurve(0.0, [(1.0, 10.3), (2.5, 15.9)])]
)
print(f"Updating XY-set at {UNVERSIONED_PATH}")
session.update_xy_sets(UNVERSIONED_PATH, new_xy_sets=[sample_xy_set])
print("Updated")
print(
f"Replacing XY-set versions in interval [{start_time}, {end_time}) with one version at {start_time}"
)
sample_xy_set.valid_from_time = start_time
session.update_xy_sets(VERSIONED_PATH, start_time, end_time, [sample_xy_set])
print("Updated")
print(f"Getting XY-sets from {UNVERSIONED_PATH}")
xy_sets = session.get_xy_sets(UNVERSIONED_PATH)
print(f"Received: {xy_sets}")
print(
f"Getting XY-sets in interval [{start_time}, {end_time}) from {VERSIONED_PATH}"
)
xy_sets = session.get_xy_sets(VERSIONED_PATH, start_time, end_time)
print(f"Received: {xy_sets}")
if __name__ == "__main__":
args = helpers.get_connection_info()
main(*args)
Write time series¶
import asyncio
import uuid
from datetime import datetime
import helpers
import pandas as pd
import pyarrow as pa
from dateutil import tz
import volue.mesh.aio
from volue import mesh
# Define the time series identifier, it can be:
# - time series key of a physical time series
# - path of a time series attribute that is connected to a physical time series
# - ID of a time series attribute that is connected to a physical time series
timeseries_key = 3
timeseries_attribute_path = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2.TsRawAtt"
timeseries_attribute_id = uuid.UUID("e5df77a9-8b60-4b0a-aa1b-3c3957c538a0")
def get_pa_table_with_time_series_points() -> pa.Table:
# Defining the data we want to write.
# Mesh data is organized as an Arrow table with the following schema:
# utc_time - [pa.timestamp('ms')] as a UTC Unix timestamp expressed in milliseconds
# flags - [pa.uint32]
# value - [pa.float64]
# time_zone = tz.gettz("Europe/Warsaw")
time_zone = tz.UTC
arrays = [
# if no time zone is provided then the timestamp is treated as UTC
pa.array(
pd.date_range(
datetime(2016, 1, 1, 1, tzinfo=time_zone), periods=3, freq="1h"
)
),
pa.array([mesh.Timeseries.PointFlags.OK.value] * 3),
pa.array([0.0, 10.0, 1000.0]),
]
return pa.Table.from_arrays(arrays, schema=mesh.Timeseries.schema)
def sync_write_timeseries_points(address, tls_root_pem_cert):
print("Synchronous write time series points:")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.Connection.insecure(address)
with connection.create_session() as session:
table = get_pa_table_with_time_series_points()
# Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
# In case of larger data volumes please send input data in chunks.
# E.g.: call multiple times `write_timeseries_points` with shorter interval.
# Send request to write time series based on time series key.
timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series key.")
# To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
empty_table = mesh.Timeseries.schema.empty_table()
# If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
# Because we are providing empty table we must specify them explicitly.
# For this interval all existing points will be removed.
#
# End time is exclusive so from the 3 points written by timeseries key,
# 2 points will be removed by this call.
#
# Send request to write time series based on time series attribute path/full_name.
timeseries = mesh.Timeseries(
table=empty_table,
start_time=datetime(2016, 1, 1, 1),
end_time=datetime(2016, 1, 1, 3),
full_name=timeseries_attribute_path,
)
session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series attribute path.")
# Let's check it. We should get just one point.
# Note:
# - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
# - the `end_time` in `read_timeseries_points` is also exclusive,
# but for time series with linear curve type Mesh is also returning one point after the interval.
timeseries = session.read_timeseries_points(
target=timeseries_key,
start_time=datetime(2016, 1, 1, 1),
end_time=datetime(2016, 1, 1, 3),
)
print(f"Read {timeseries.number_of_points} points using time series key.")
print(timeseries.arrow_table.to_pandas())
# Send request to write time series based on time series attribute ID.
# Attribute IDs are auto-generated when an object is created.
# That is why we can't use any fixed ID in this example and the code will throw.
try:
timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series attribute ID.")
except Exception as e:
print(
f"failed to write time series points based on time series attribute ID: {e}"
)
# Commit the changes to the database.
# session.commit()
# Or discard changes.
session.rollback()
async def async_write_timeseries_points(
address,
tls_root_pem_cert,
):
print("Asynchronous write time series points:")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(address, tls_root_pem_cert)
connection = mesh.aio.Connection.insecure(address)
async with connection.create_session() as session:
table = get_pa_table_with_time_series_points()
# Each time series point occupies 20 bytes. Mesh server has a limitation of 4MB inbound message size.
# In case of larger data volumes please send input data in chunks.
# E.g.: call multiple times `write_timeseries_points` with shorter interval.
# Send request to write time series based on time series key.
timeseries = mesh.Timeseries(table=table, timskey=timeseries_key)
await session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series key.")
# To only remove time series points we need to provide an empty PyArrow table, but with correct schema.
empty_table = mesh.Timeseries.schema.empty_table()
# If `start_time` and `end_time` are not provided explicitly they will be taken from PyArrow `table`.
# Because we are providing empty table we must specify them explicitly.
# For this interval all existing points will be removed.
#
# End time is exclusive so from the 3 points written by timeseries key,
# 2 points will be removed by this call.
#
# Send request to write time series based on time series attribute path/full_name.
timeseries = mesh.Timeseries(
table=empty_table,
start_time=datetime(2016, 1, 1, 1),
end_time=datetime(2016, 1, 1, 3),
full_name=timeseries_attribute_path,
)
await session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series attribute path.")
# Let's check it. We should get just one point.
# Note:
# - the `timeseries_attribute_path` and `timeseries_key` both point to the same time series attribute.
# - the `end_time` in `read_timeseries_points` is also exclusive,
# but for time series with linear curve type Mesh is also returning one point after the interval.
timeseries = await session.read_timeseries_points(
target=timeseries_key,
start_time=datetime(2016, 1, 1, 1),
end_time=datetime(2016, 1, 1, 3),
)
print(f"Read {timeseries.number_of_points} points using time series key.")
print(timeseries.arrow_table.to_pandas())
# Send request to write time series based on time series attribute ID.
# Attribute IDs are auto-generated when an object is created.
# That is why we can't use any fixed ID in this example and the code will throw.
try:
timeseries = mesh.Timeseries(table=table, uuid_id=timeseries_attribute_id)
await session.write_timeseries_points(timeseries=timeseries)
print("Time series points written using time series attribute ID.")
except Exception as e:
print(
f"failed to write time series points based on time series attribute ID: {e}"
)
# Commit the changes to the database.
# await session.commit()
# Or discard changes.
await session.rollback()
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
sync_write_timeseries_points(
address,
tls_root_pem_cert,
)
asyncio.run(async_write_timeseries_points(address, tls_root_pem_cert))
Run simulations¶
import asyncio
import logging
from datetime import datetime, timedelta
import helpers
import volue.mesh.aio
from volue import mesh
GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB
def sync_run_simulation(address, tls_root_pem_cert):
print("connecting...")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )
# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer simulation intervals the gRPC message size
# may exceed this limit. In such cases the user can set new limit using
# `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)
with connection.create_session() as session:
start_time = datetime(2023, 11, 1)
end_time = datetime(2023, 11, 2)
print("running simulation...")
try:
for response in session.run_simulation(
"Mesh",
"Cases/Demo",
start_time,
end_time,
return_datasets=True,
resolution=timedelta(minutes=5),
):
if isinstance(response, mesh.LogMessage):
print(
f"[{logging.getLevelName(response.level)}] {response.message}"
)
elif isinstance(response, mesh.HydSimDataset):
print(
f"Received dataset {response.name} with {len(response.data)} bytes"
)
print("done")
except Exception as e:
print(f"failed to run simulation: {e}")
async def async_run_simulation(address, tls_root_pem_cert):
print("connecting...")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )
# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer simulation intervals the gRPC message size
# may exceed this limit. In such cases the user can set new limit using
# `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.aio.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)
async with connection.create_session() as session:
start_time = datetime(2023, 11, 1)
end_time = datetime(2023, 11, 2)
print("running simulation...")
try:
async for response in session.run_simulation(
"Mesh",
"Cases/Demo",
start_time,
end_time,
return_datasets=True,
resolution=timedelta(minutes=5),
):
if isinstance(response, mesh.LogMessage):
print(
f"[{logging.getLevelName(response.level)}] {response.message}"
)
elif isinstance(response, mesh.HydSimDataset):
print(
f"Received dataset {response.name} with {len(response.data)} bytes"
)
print("done")
except Exception as e:
print(f"failed to run simulation: {e}")
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
sync_run_simulation(address, tls_root_pem_cert)
asyncio.run(async_run_simulation(address, tls_root_pem_cert))
Run inflow calculations¶
import asyncio
import logging
from datetime import datetime, timedelta
import helpers
import volue.mesh.aio
from volue import mesh
GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES = 10 * 1024 * 1024 # 10MB
def sync_run_inflow_calculation(address, tls_root_pem_cert):
print("connecting...")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )
# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer inflow calculation intervals the gRPC message
# size may exceed this limit. In such cases the user can set new limit
# using `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)
with connection.create_session() as session:
start_time = datetime(2021, 1, 1)
end_time = datetime(2021, 1, 2)
print("running inflow calculation...")
try:
for response in session.run_inflow_calculation(
"Mesh",
"Area",
"WaterCourse",
start_time,
end_time,
return_datasets=True,
resolution=timedelta(minutes=5),
):
if isinstance(response, mesh.LogMessage):
print(
f"[{logging.getLevelName(response.level)}] {response.message}"
)
elif isinstance(response, mesh.HydSimDataset):
print(
f"Received dataset {response.name} with {len(response.data)} bytes"
)
print("done")
except Exception as e:
print(f"failed to run inflow calculation: {e}")
async def async_run_inflow_calculation(address, tls_root_pem_cert):
print("connecting...")
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = mesh.aio.Connection.with_tls(
# address,
# tls_root_pem_cert,
# grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
# )
# By default the maximum inbound gRPC message size is 4MB. When Mesh server
# returns datasets for longer inflow calculation intervals the gRPC message
# size may exceed this limit. In such cases the user can set new limit
# using `grpc_max_receive_message_length` when creating a connection to Mesh.
connection = mesh.aio.Connection.insecure(
address,
grpc_max_receive_message_length=GRPC_MAX_RECEIVE_MESSAGE_LENGTH_IN_BYTES,
)
async with connection.create_session() as session:
start_time = datetime(2021, 1, 1)
end_time = datetime(2021, 1, 2)
print("running inflow calculation...")
try:
async for response in session.run_inflow_calculation(
"Mesh",
"Area",
"WaterCourse",
start_time,
end_time,
return_datasets=True,
resolution=timedelta(minutes=5),
):
if isinstance(response, mesh.LogMessage):
print(
f"[{logging.getLevelName(response.level)}] {response.message}"
)
elif isinstance(response, mesh.HydSimDataset):
print(
f"Received dataset {response.name} with {len(response.data)} bytes"
)
print("done")
except Exception as e:
print(f"failed to run inflow calculation: {e}")
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
sync_run_inflow_calculation(address, tls_root_pem_cert)
asyncio.run(async_run_inflow_calculation(address, tls_root_pem_cert))