team-10/venv/Lib/site-packages/streamlit/source_util.py
2025-08-02 02:00:33 +02:00

192 lines
6.2 KiB
Python

# Copyright 2018-2022 Streamlit Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import threading
from pathlib import Path
from typing import Any, Callable, cast, Dict, List, Optional, Tuple
from blinker import Signal
from streamlit.logger import get_logger
from streamlit.util import calc_md5
LOGGER = get_logger(__name__)
def open_python_file(filename):
"""Open a read-only Python file taking proper care of its encoding.
In Python 3, we would like all files to be opened with utf-8 encoding.
However, some author like to specify PEP263 headers in their source files
with their own encodings. In that case, we should respect the author's
encoding.
"""
import tokenize
if hasattr(tokenize, "open"): # Added in Python 3.2
# Open file respecting PEP263 encoding. If no encoding header is
# found, opens as utf-8.
return tokenize.open(filename)
else:
return open(filename, "r", encoding="utf-8")
PAGE_FILENAME_REGEX = re.compile(r"([0-9]*)[_ -]*(.*)\.py")
# Regex pattern to extract emoji taken from https://gist.github.com/Alex-Just/e86110836f3f93fe7932290526529cd1#gistcomment-3208085
# We may eventually want to swap this out for https://pypi.org/project/emoji,
# but I want to avoid adding a dependency if possible.
PAGE_ICON_REGEX = re.compile(
"(^[\U0001F1E0-\U0001F1FF"
"\U0001F300-\U0001F5FF"
"\U0001F600-\U0001F64F"
"\U0001F680-\U0001F6FF"
"\U0001F700-\U0001F77F"
"\U0001F780-\U0001F7FF"
"\U0001F800-\U0001F8FF"
"\U0001F900-\U0001F9FF"
"\U0001FA00-\U0001FA6F"
"\U0001FA70-\U0001FAFF"
"\U00002702-\U000027B0"
"\U000024C2-\U0001F251])[_-]*"
)
def page_sort_key(script_path: Path) -> Tuple[float, str]:
matches = re.findall(PAGE_FILENAME_REGEX, script_path.name)
# Failing this assert should only be possible if script_path isn't a Python
# file, which should never happen.
assert len(matches) > 0, f"{script_path} is not a Python file"
[(number, label)] = matches
label = label.lower()
if number == "":
return (float("inf"), label)
return (float(number), label)
def page_name_and_icon(script_path: Path) -> Tuple[str, str]:
"""Compute the name of a page from its script path.
This is *almost* the page name displayed in the nav UI, but it has
underscores instead of spaces. The reason we do this is because having
spaces in URLs both looks bad and is hard to deal with due to the need to
URL-encode them. To solve this, we only swap the underscores for spaces
right before we render page names.
"""
extraction = re.search(PAGE_FILENAME_REGEX, script_path.name)
if extraction is None:
return "", ""
# This cast to Any+type annotation weirdness is done because
# cast(re.Match[str], ...) explodes at runtime since Python interprets it
# as an attempt to index into re.Match instead of as a type annotation.
extraction: re.Match[str] = cast(Any, extraction)
name = re.sub(r"[_ ]+", "_", extraction.group(2)).strip()
if not name:
name = extraction.group(1)
extracted_icon = re.search(PAGE_ICON_REGEX, name)
if extracted_icon is not None:
icon = str(extracted_icon.group(1))
name = re.sub(PAGE_ICON_REGEX, "", name)
else:
icon = ""
return str(name), icon
_pages_cache_lock = threading.RLock()
_cached_pages: Optional[Dict[str, Dict[str, str]]] = None
_on_pages_changed = Signal(doc="Emitted when the pages directory is changed")
def invalidate_pages_cache():
global _cached_pages
LOGGER.debug("Pages directory changed")
with _pages_cache_lock:
_cached_pages = None
_on_pages_changed.send()
def get_pages(main_script_path_str: str) -> Dict[str, Dict[str, str]]:
global _cached_pages
# Avoid taking the lock if the pages cache hasn't been invalidated.
pages = _cached_pages
if pages is not None:
return pages
with _pages_cache_lock:
# The cache may have been repopulated while we were waiting to grab
# the lock.
if _cached_pages is not None:
return _cached_pages
main_script_path = Path(main_script_path_str)
main_page_name, main_page_icon = page_name_and_icon(main_script_path)
main_page_script_hash = calc_md5(main_script_path_str)
# NOTE: We include the page_script_hash in the dict even though it is
# already used as the key because that occasionally makes things
# easier for us when we need to iterate over pages.
pages = {
main_page_script_hash: {
"page_script_hash": main_page_script_hash,
"page_name": main_page_name,
"icon": main_page_icon,
"script_path": str(main_script_path.resolve()),
}
}
pages_dir = main_script_path.parent / "pages"
page_scripts = sorted(
[f for f in pages_dir.glob("*.py") if not f.name.startswith(".")],
key=page_sort_key,
)
for script_path in page_scripts:
script_path_str = str(script_path.resolve())
pn, pi = page_name_and_icon(script_path)
psh = calc_md5(script_path_str)
pages[psh] = {
"page_script_hash": psh,
"page_name": pn,
"icon": pi,
"script_path": script_path_str,
}
_cached_pages = pages
return pages
def register_pages_changed_callback(
callback: Callable[[str], None],
):
def disconnect():
_on_pages_changed.disconnect(callback)
# weak=False so that we have control of when the pages changed
# callback is deregistered.
_on_pages_changed.connect(callback, weak=False)
return disconnect