Mesh Availability¶
A Mesh availability event provides information about periods during which Mesh objects are unavailable or partially available. The availability functionality helps track and model the operational status of assets throughout time.
Types of Availability Events¶
Mesh Python SDK supports three types of availability events:
Revision - Represents periods when an object is completely unavailable (e.g., during maintenance or outage)
Restriction - Represents periods when an object is partially available with reduced capacity
Availability Recurrence¶
Availability events can be defined with various recurrence patterns:
Single occurrence - One-time events with a specific start and end time
Daily - Repeating pattern on a daily basis
Weekly - Repeating pattern on a weekly basis
Monthly - Repeating pattern on a monthly basis
Yearly - Repeating pattern on a yearly basis
Each recurrence pattern allows specifying:
How often the pattern repeats (e.g., every 2 weeks)
When the pattern ends (if applicable)
The time interval for each occurrence
Instances¶
An instance is a single occurrence of a revision or restriction according to its recurrence pattern. When analyzing availability, you can search for all instances that occur within a specified time period.
For example, a weekly revision occurring every Monday for two months would have approximately eight instances, one for each Monday in the specified period.
Categories and Statuses¶
Availability events can have:
Status - Indicates the current state of the event (e.g., planned, confirmed, cancelled)
Category - Classifies the type of event (e.g., maintenance, external constraints)
Reason - Text description explaining why the event exists
Common Use Cases¶
The availability functionality can be used for:
Maintenance Planning - Scheduling and tracking planned downtime periods
Capacity Management - Modeling partial capacity constraints
Outage Tracking - Documenting unexpected unavailability periods
Resource Allocation - Planning operations around known availability constraints
When working with availability data, you can search for events affecting specific objects, find all instances occurring within a time period, and incorporate availability information into operational planning and analysis.
The following example shows different ways of working with availability.
from datetime import datetime
import dateutil
import helpers
from volue.mesh import Connection
from volue.mesh.availability import (
EventType,
Recurrence,
RecurrenceType,
RestrictionBasicRecurrence,
RestrictionComplexRecurrence,
Revision,
TimePoint,
)
CHIMNEY_PATH = "Model/SimpleThermalTestModel/ThermalComponent/SomePowerPlant1/SomePowerPlantChimney2"
def revision_workflow(session: Connection.Session):
"""
Demonstrates a complete workflow for creating, managing, and deleting revisions in Mesh.
"""
print("\n=== Starting Revision Workflow Example ===\n")
# 1. Create a revision
print("1. Creating a new revision...")
revision = session.availability.create_revision(
target=CHIMNEY_PATH,
event_id="event_id",
local_id="local_id",
reason="Revision reason",
)
print(f" Created revision with ID: {revision.event_id}")
print(f" Owner ID: {revision.owner_id}")
print(f" Created by: {revision.created.author} at {revision.created.timestamp}")
print(f" Initial recurrences count: {len(revision.recurrences)}")
# 2. Add a recurrence to the revision
print("\n2. Adding a basic recurrence...")
recurrence_id = session.availability.add_revision_recurrence(
target=CHIMNEY_PATH,
event_id=revision.event_id,
period_start=datetime(2023, 1, 1, tzinfo=dateutil.tz.UTC),
period_end=datetime(2023, 1, 2, tzinfo=dateutil.tz.UTC),
recurrence=Recurrence(
status="Planned",
description="Recurrence",
recurrence_type=RecurrenceType.NONE,
),
)
print(f" Added recurrence with ID: {recurrence_id}")
# 3. Get the revision
print("\n3. Getting the revision with its new recurrence...")
revision_with_recurrence = session.availability.get_availability_event(
target=CHIMNEY_PATH,
event_id=revision.event_id,
)
print(f" Retrieved revision with ID: {revision_with_recurrence.event_id}")
print(f" Recurrences count: {len(revision_with_recurrence.recurrences)}")
if revision_with_recurrence.recurrences:
recurrence = revision_with_recurrence.recurrences[0]
print(f" Recurrence status: {recurrence.recurrence.status}")
print(f" Recurrence description: {recurrence.recurrence.description}")
print(
f" Recurrence period: {recurrence.period_start} to {recurrence.period_end}"
)
# 4. Add another recurrence with a different pattern
print("\n4. Adding a daily repeating recurrence...")
second_recurrence_id = session.availability.add_revision_recurrence(
target=CHIMNEY_PATH,
event_id=revision.event_id,
period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
period_end=datetime(2023, 2, 2, tzinfo=dateutil.tz.UTC),
recurrence=Recurrence(
status="Planned",
description="Second Recurrence",
recurrence_type=RecurrenceType.DAILY,
recur_every=2,
recur_until=datetime(2023, 2, 15, tzinfo=dateutil.tz.UTC),
),
)
print(f" Added second recurrence with ID: {second_recurrence_id}")
# 5. Search for the revision using search_availability_events
print("\n5. Searching for revisions...")
search_results = session.availability.search_availability_events(
event_type=EventType.REVISION,
targets=[CHIMNEY_PATH],
)
print(f" Found {len(search_results)} revision events")
for i, result in enumerate(search_results):
if isinstance(result, Revision):
print(
f" Result {i+1}: Event ID: {result.event_id}, Reason: {result.reason}"
)
# 6. Search for instances of the revision
print("\n6. Searching for specific instances of the revision...")
instances = session.availability.search_instances(
target=CHIMNEY_PATH,
event_id="event_id",
period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
period_end=datetime(2023, 2, 15, tzinfo=dateutil.tz.UTC),
)
print(f" Found {len(instances)} instances")
# These are the actual occurrences based on the recurrence pattern
for i, instance in enumerate(instances):
print(
f" Instance {i+1}: Period: {instance.period_start} to {instance.period_end}"
)
# 7. Update the revision
print("\n7. Updating the revision...")
session.availability.update_revision(
target=CHIMNEY_PATH,
event_id="event_id",
new_local_id="updated_local_id",
new_reason="Updated reason",
)
updated_revision = session.availability.get_availability_event(
target=CHIMNEY_PATH, event_id="event_id"
)
print(f" Updated local ID: {updated_revision.local_id}")
print(f" Updated reason: {updated_revision.reason}")
print(f" Last modified: {updated_revision.last_changed.timestamp}")
# 8. Delete a specific recurrence
print("\n8. Deleting the second recurrence pattern...")
session.availability.delete_revision_recurrence(
target=CHIMNEY_PATH,
event_id="event_id",
recurrence_id=second_recurrence_id,
)
revision_after_delete = session.availability.get_availability_event(
target=CHIMNEY_PATH, event_id="event_id"
)
print(f" Recurrences remaining: {len(revision_after_delete.recurrences)}")
# 9. Finally, delete the revision using delete_availability_events_by_id
print("\n9. Deleting the entire revision...")
session.availability.delete_availability_events_by_id(
target=CHIMNEY_PATH,
event_ids=["event_id"],
)
remaining_revisions = session.availability.search_availability_events(
event_type=EventType.REVISION,
targets=[CHIMNEY_PATH],
)
print(f" Revisions found: {len(remaining_revisions)}")
print("\n=== Revision Workflow Example Completed ===\n")
def restriction_workflow(session: Connection.Session):
"""
Demonstrates a complete workflow for creating, managing, and deleting restrictions in Mesh.
"""
print("\n=== Starting Restriction Workflow Example ===\n")
# 1. Create a basic restriction with constant value
print("1. Creating a basic restriction...")
basic_restriction = session.availability.create_restriction(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
local_id="basic_local_id",
reason="basic restriction",
category="DischargeMin[m3/s]",
recurrence=RestrictionBasicRecurrence(
recurrence=Recurrence(
status="SelfImposed",
description="Basic restriction",
recurrence_type=RecurrenceType.WEEKLY,
recur_every=1,
recur_until=datetime(2023, 1, 31, tzinfo=dateutil.tz.UTC),
),
period_start=datetime(2023, 1, 2, tzinfo=dateutil.tz.UTC), # Monday
period_end=datetime(2023, 1, 3, tzinfo=dateutil.tz.UTC), # Tuesday
value=75.5, # 75.5% capacity
),
)
print(f" Created basic restriction with ID: {basic_restriction.event_id}")
print(f" Owner ID: {basic_restriction.owner_id}")
print(f" Category: {basic_restriction.category}")
print(f" Value: {basic_restriction.recurrence.value}")
print(
f" Created by: {basic_restriction.created.author} at {basic_restriction.created.timestamp}"
)
print(f" Status: {basic_restriction.recurrence.recurrence.status}")
# 2. Create a complex restriction with multiple time points
print("\n2. Creating a complex restriction with multiple time points...")
complex_restriction = session.availability.create_restriction(
target=CHIMNEY_PATH,
event_id="complex_restriction_id",
local_id="complex_local_id",
reason="Complex restriction",
category="DischargeMax[m3/s]",
recurrence=RestrictionComplexRecurrence(
recurrence=Recurrence(
status="SelfImposed",
description="Complex restriction",
recurrence_type=RecurrenceType.DAILY,
recur_every=1,
recur_until=datetime(2023, 1, 15, tzinfo=dateutil.tz.UTC),
),
time_points=[
TimePoint(
value=80.0,
timestamp=datetime(2023, 1, 1, 8, 0, tzinfo=dateutil.tz.UTC),
),
TimePoint(
value=60.0,
timestamp=datetime(2023, 1, 1, 12, 0, tzinfo=dateutil.tz.UTC),
),
TimePoint(
value=70.0,
timestamp=datetime(2023, 1, 1, 16, 0, tzinfo=dateutil.tz.UTC),
),
TimePoint(
value=90.0,
timestamp=datetime(2023, 1, 1, 20, 0, tzinfo=dateutil.tz.UTC),
),
],
),
)
print(f" Created complex restriction with ID: {complex_restriction.event_id}")
print(f" Category: {complex_restriction.category}")
print(
f" Number of time points: {len(complex_restriction.recurrence.time_points)}"
)
print(
f" Recurrence type: {complex_restriction.recurrence.recurrence.recurrence_type.name}"
)
print(f" Repeats until: {complex_restriction.recurrence.recurrence.recur_until}")
# 3. Search for restrictions
print("\n3. Searching for all restrictions...")
restrictions = session.availability.search_availability_events(
event_type=EventType.RESTRICTION,
targets=[CHIMNEY_PATH],
)
print(f" Found {len(restrictions)} restrictions")
for i, restriction in enumerate(restrictions):
print(
f" Restriction {i+1}: ID: {restriction.event_id}, Category: {restriction.category}"
)
# 4. Get a specific restriction by ID
print("\n4. Getting specific restriction details...")
retrieved_restriction = session.availability.get_availability_event(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
)
print(f" Retrieved restriction with ID: {retrieved_restriction.event_id}")
print(f" Local ID: {retrieved_restriction.local_id}")
print(f" Reason: {retrieved_restriction.reason}")
print(f" Category: {retrieved_restriction.category}")
# Check the type of recurrence and print the value accordingly
# In this case we expect the recurrence to be of type RestrictionBasicRecurrence
if isinstance(retrieved_restriction.recurrence, RestrictionBasicRecurrence):
print(f" Value: {retrieved_restriction.recurrence.value}")
elif isinstance(retrieved_restriction.recurrence, RestrictionComplexRecurrence):
# In case of complex recurrence we can print the time points
for i, point in enumerate(retrieved_restriction.recurrence.values):
print(
f" Time point {i+1}: Value: {point.value}, Timestamp: {point.timestamp}"
)
# 4.5 Get a specific restriction by ID
print("\n4.5. Getting specific restriction details...")
retrieved_restriction = session.availability.get_availability_event(
target=CHIMNEY_PATH,
event_id="complex_restriction_id",
)
print(f" Retrieved restriction with ID: {retrieved_restriction.event_id}")
print(f" Local ID: {retrieved_restriction.local_id}")
print(f" Reason: {retrieved_restriction.reason}")
print(f" Category: {retrieved_restriction.category}")
# Check the type of recurrence and print the value accordingly
# In this case we expect the recurrence to be of type RestrictionComplexRecurrence
if isinstance(retrieved_restriction.recurrence, RestrictionBasicRecurrence):
print(f" Value: {retrieved_restriction.recurrence.value}")
elif isinstance(retrieved_restriction.recurrence, RestrictionComplexRecurrence):
# In case of complex recurrence we can print the time points
for i, point in enumerate(retrieved_restriction.recurrence.time_points):
print(
f" Time point {i+1}: Value: {point.value}, Timestamp: {point.timestamp}"
)
# 5. Search for instances within a time period
print("\n5. Searching for specific instances of the basic restriction...")
instances = session.availability.search_instances(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
period_start=datetime(2023, 1, 1, tzinfo=dateutil.tz.UTC),
period_end=datetime(2023, 1, 31, tzinfo=dateutil.tz.UTC),
)
print(f" Found {len(instances)} instances")
for i, instance in enumerate(instances): # Just show first few instances
print(
f" Instance {i+1}: Period: {instance.period_start} to {instance.period_end}, Value: {instance.value}"
)
# 6. Update restriction
print("\n6. Updating the basic restriction...")
session.availability.update_restriction(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
new_local_id="updated_basic_id",
new_reason="Updated basic restriction reason",
new_category="DischargeMax[m3/s]",
)
# Verify the update
updated_restriction = session.availability.get_availability_event(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
)
print(f" Updated local ID: {updated_restriction.local_id}")
print(f" Updated reason: {updated_restriction.reason}")
print(f" Updated category: {updated_restriction.category}")
print(f" Last modified: {updated_restriction.last_changed.timestamp}")
# 7. Update restriction recurrence
print("\n7. Updating the restriction recurrence...")
new_recurrence = RestrictionBasicRecurrence(
recurrence=Recurrence(
status="SelfImposed",
description="Updated restriction recurrence",
recurrence_type=RecurrenceType.NONE,
),
period_start=datetime(2023, 2, 1, tzinfo=dateutil.tz.UTC),
period_end=datetime(2023, 2, 10, tzinfo=dateutil.tz.UTC),
value=50.0,
)
session.availability.update_restriction(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
new_restriction_recurrence=new_recurrence,
)
updated_restriction = session.availability.get_availability_event(
target=CHIMNEY_PATH,
event_id="basic_restriction_id",
)
print(
f" New recurrence period: {updated_restriction.recurrence.period_start} to {updated_restriction.recurrence.period_end}"
)
print(f" New value: {updated_restriction.recurrence.value}")
print(
f" New recurrence type: {updated_restriction.recurrence.recurrence.recurrence_type.name}"
)
# 8. Delete restrictions
print("\n8. Deleting restrictions...")
session.availability.delete_availability_events_by_id(
target=CHIMNEY_PATH,
event_ids=["basic_restriction_id", "complex_restriction_id"],
)
# Verify deletion
remaining_restrictions = session.availability.search_availability_events(
event_type=EventType.RESTRICTION,
targets=[CHIMNEY_PATH],
)
print(f" Restrictions found: {len(remaining_restrictions)}")
print("\n=== Restriction Workflow Example Completed ===\n")
def main(address, tls_root_pem_cert):
"""Showing how to create a revision."""
# For production environments create connection using: with_tls, with_kerberos, or with_external_access_token, e.g.:
# connection = Connection.with_tls(address, tls_root_pem_cert)
connection = Connection.insecure(address)
with connection.create_session() as session:
revision_workflow(session)
restriction_workflow(session)
if __name__ == "__main__":
address, tls_root_pem_cert = helpers.get_connection_info()
main(address, tls_root_pem_cert)
Detailed API specification can be found here: volue.mesh.availability