team-10/env/Lib/site-packages/streamlit/runtime/stats.py
2025-08-02 07:34:44 +02:00

109 lines
3.7 KiB
Python

# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2025)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import itertools
from abc import abstractmethod
from typing import TYPE_CHECKING, NamedTuple, Protocol, runtime_checkable
if TYPE_CHECKING:
from streamlit.proto.openmetrics_data_model_pb2 import Metric as MetricProto
class CacheStat(NamedTuple):
"""Describes a single cache entry.
Properties
----------
category_name : str
A human-readable name for the cache "category" that the entry belongs
to - e.g. "st.memo", "session_state", etc.
cache_name : str
A human-readable name for cache instance that the entry belongs to.
For "st.memo" and other function decorator caches, this might be the
name of the cached function. If the cache category doesn't have
multiple separate cache instances, this can just be the empty string.
byte_length : int
The entry's memory footprint in bytes.
"""
category_name: str
cache_name: str
byte_length: int
def to_metric_str(self) -> str:
return f'cache_memory_bytes{{cache_type="{self.category_name}",cache="{self.cache_name}"}} {self.byte_length}'
def marshall_metric_proto(self, metric: MetricProto) -> None:
"""Fill an OpenMetrics `Metric` protobuf object."""
label = metric.labels.add()
label.name = "cache_type"
label.value = self.category_name
label = metric.labels.add()
label.name = "cache"
label.value = self.cache_name
metric_point = metric.metric_points.add()
metric_point.gauge_value.int_value = self.byte_length
def group_stats(stats: list[CacheStat]) -> list[CacheStat]:
"""Group a list of CacheStats by category_name and cache_name and sum byte_length."""
def key_function(individual_stat: CacheStat) -> tuple[str, str]:
return individual_stat.category_name, individual_stat.cache_name
result: list[CacheStat] = []
sorted_stats = sorted(stats, key=key_function)
grouped_stats = itertools.groupby(sorted_stats, key=key_function)
for (category_name, cache_name), single_group_stats in grouped_stats:
result.append(
CacheStat(
category_name=category_name,
cache_name=cache_name,
byte_length=sum(item.byte_length for item in single_group_stats),
)
)
return result
@runtime_checkable
class CacheStatsProvider(Protocol):
@abstractmethod
def get_stats(self) -> list[CacheStat]:
raise NotImplementedError
class StatsManager:
def __init__(self) -> None:
self._cache_stats_providers: list[CacheStatsProvider] = []
def register_provider(self, provider: CacheStatsProvider) -> None:
"""Register a CacheStatsProvider with the manager.
This function is not thread-safe. Call it immediately after
creation.
"""
self._cache_stats_providers.append(provider)
def get_stats(self) -> list[CacheStat]:
"""Return a list containing all stats from each registered provider."""
all_stats: list[CacheStat] = []
for provider in self._cache_stats_providers:
all_stats.extend(provider.get_stats())
return all_stats