Source code for volue.mesh.calc.common

Common classes/enums/etc for Mesh calculation functions.

import datetime
import uuid
from enum import Enum
from typing import List, Union

from dateutil import tz

from volue.mesh import AttributeBase, Object, Timeseries
from volue.mesh._common import (
from volue.mesh._mesh_id import _to_proto_calculation_target_mesh_id
from volue.mesh.proto.calc.v1alpha import calc_pb2

[docs]class Timezone(Enum): """ Timezone specifier. Args: LOCAL: Local time zone. STANDARD: Local time zone without Daylight Saving Time (DST). UTC: Universal Time Coordinated (UTC). """ LOCAL = 0 STANDARD = 1 UTC = 2
def _convert_datetime_to_mesh_calc_format(input: datetime.datetime) -> str: """ Converts input datetime to format expected by Mesh calculator. Datetime is converted to UTC. If input datetime is time zone naive then it is already treated as UTC. Example: 'UTC20210917000000000' Args: input: The timestamp to be converted. Returns: Datetime in Mesh calculator format. """ input_utc_datetime = input if input.tzinfo is not None: input_utc_datetime = input.astimezone(tz.UTC) converted_date_str = input_utc_datetime.strftime("%Y%m%d%H%M%S%f")[:-3] converted_date_str = f"UTC{converted_date_str}" return converted_date_str def _parse_timeseries_list_response( response: calc_pb2.CalculationResponse, ) -> List[Timeseries]: """ Helper function for parsing a calculator response. Args: response: The gRPC response received from the Mesh server. Returns: A list of time series. """ timeseries = _read_proto_reply(response.timeseries_results) return timeseries def _parse_single_timeseries_response( response: calc_pb2.CalculationResponse, ) -> Timeseries: """ Helper function for parsing a calculator response. Args: response: The gRPC response received from the Mesh server. Returns: A single time series. Raises: RuntimeError: Error message raised if the input is not valid. """ timeseries = _read_proto_reply(response.timeseries_results) if len(timeseries) != 1: raise RuntimeError( f"invalid calculation result, expected 1 timeseries, but got {len(timeseries)}" ) return timeseries[0] def _parse_single_float_response(response: calc_pb2.CalculationResponse) -> float: """ Helper function for parsing a calculator response. Args: response: The gRPC response received from the Mesh server. Returns: Result of the calculation. Raises: RuntimeError: Error message raised if the input is not valid. """ result = _read_proto_numeric_reply(response.numeric_results) if len(result) != 1: raise RuntimeError( f"invalid calculation result, expected 1 float value, but got {len(result)}" ) return result[0] class _Calculation: """ Base class for all calculations. """ def __init__( self, session, target: Union[uuid.UUID, str, int, AttributeBase, Object], start_time: datetime.datetime, end_time: datetime.datetime, ): """ Args: session: Active Mesh session. target: Mesh object, attribute, virtual or physical time series the calculation expression will be evaluated relative to. It could be a time series key, Universal Unique Identifier or a path in the :ref:`Mesh model <mesh_model>`. start_time: The start date and time of the time series interval. end_time: The end date and time of the time series interval. """ self.session = session Union[uuid.UUID, str, int, AttributeBase, Object] = target self.start_time: datetime.datetime = start_time self.end_time: datetime.datetime = end_time def prepare_request(self, expression: str) -> calc_pb2.CalculationRequest: """ Checks that the requirements for a calculation request are met, and constructs a calculation request object. Args: expression: Expression which consists of one or more functions to call. See :ref:`expressions <mesh expression>`. Returns: gRPC calculation request. """ request = calc_pb2.CalculationRequest( session_id=_to_proto_guid(self.session.session_id), expression=expression, interval=_to_proto_utcinterval(self.start_time, self.end_time), relative_to=_to_proto_calculation_target_mesh_id(, ) return request async def run_async(self, expression: str) -> calc_pb2.CalculationResponse: """ Run a function using an asynchronous connection. Args: expression: Expression which consists of one or more functions to call. See :ref:`expressions <mesh expression>`. Returns: gRPC calculation response. """ from volue.mesh.aio import Connection as AsyncConnection if not isinstance(self.session, AsyncConnection.Session): raise TypeError( "async connection session is required to run async calculations, but got sync session" ) request = self.prepare_request(expression) response = await self.session.calc_service.RunCalculation(request) return response def run(self, expression: str): """ Run a function using a synchronous connection. Args: expression: Expression which consists of one or more functions to call. See :ref:`expressions <mesh expression>`. Returns: gRPC calculation response. """ from volue.mesh import Connection if not isinstance(self.session, Connection.Session): raise TypeError( "sync connection session is required to run sync calculations, but got async session" ) request = self.prepare_request(expression) response = self.session.calc_service.RunCalculation(request) return response