# Copyright (c) Streamlit Inc. (2018-2022) Snowflake Inc. (2022-2025) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # NOTE: We won't always be able to import from snowflake.{connector, snowpark}.* so need # the `type: ignore` comment below, but that comment will explode if `warn-unused-ignores` # is turned on when the package is available. Unfortunately, mypy doesn't provide a good # way to configure this at a per-line level :( # mypy: no-warn-unused-ignores from __future__ import annotations from typing import TYPE_CHECKING, Any, Final, cast from streamlit import logger from streamlit.connections import BaseConnection from streamlit.connections.util import running_in_sis from streamlit.errors import StreamlitAPIException from streamlit.runtime.caching import cache_data _LOGGER: Final = logger.get_logger(__name__) if TYPE_CHECKING: from datetime import timedelta from pandas import DataFrame from snowflake.connector.cursor import SnowflakeCursor # type:ignore[import] from snowflake.snowpark.session import Session # type:ignore[import] from snowflake.connector import ( # type:ignore[import] # isort: skip SnowflakeConnection as InternalSnowflakeConnection, ) # the ANSI-compliant SQL code for "connection was not established" # (see docs: https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#id6) SQLSTATE_CONNECTION_WAS_NOT_ESTABLISHED: Final = "08001" class SnowflakeConnection(BaseConnection["InternalSnowflakeConnection"]): """A connection to Snowflake using the Snowflake Connector for Python. Initialize this connection object using ``st.connection("snowflake")`` or ``st.connection("", type="snowflake")``. Connection parameters for a SnowflakeConnection can be specified using ``secrets.toml`` and/or ``**kwargs``. Connection parameters are passed to |snowflake.connector.connect()|. When an app is running in Streamlit in Snowflake, ``st.connection("snowflake")`` connects automatically using the app owner's role without further configuration. ``**kwargs`` will be ignored in this case. Use ``secrets.toml`` and ``**kwargs`` to configure your connection for local development. SnowflakeConnection includes several convenience methods. For example, you can directly execute a SQL query with ``.query()`` or access the underlying Snowflake Connector object with ``.raw_connection``. .. |snowflake.connector.connect()| replace:: ``snowflake.connector.connect()`` .. _snowflake.connector.connect(): https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-api#label-snowflake-connector-methods-connect .. Tip:: `snowflake-snowpark-python `_ must be installed in your environment to use this connection. You can install Snowflake extras along with Streamlit: >>> pip install streamlit[snowflake] .. Important:: Account identifiers must be of the form ``-`` where ```` is the name of your Snowflake organization and ```` is the unique name of your account within your organization. This is dash-separated, not dot-separated like when used in SQL queries. For more information, see `Account identifiers `_. Examples -------- **Example 1: Configuration with Streamlit secrets** You can configure your Snowflake connection using Streamlit's `Secrets management `_. For example, if you have MFA enabled on your account, you can connect using `key-pair authentication `_. ``.streamlit/secrets.toml``: >>> [connections.snowflake] >>> account = "xxx-xxx" >>> user = "xxx" >>> private_key_file = "/xxx/xxx/xxx.p8" >>> role = "xxx" >>> warehouse = "xxx" >>> database = "xxx" >>> schema = "xxx" Your app code: >>> import streamlit as st >>> conn = st.connection("snowflake") >>> df = conn.query("SELECT * FROM my_table") **Example 2: Configuration with keyword arguments and external authentication** You can configure your Snowflake connection with keyword arguments. The keyword arguments are merged with (and take precedence over) the values in ``secrets.toml``. However, if you name your connection ``"snowflake"`` and don't have a ``[connections.snowflake]`` dictionary in your ``secrets.toml`` file, Streamlit will ignore any keyword arguments and use the default Snowflake connection as described in Example 5 and Example 6. To configure your connection using only keyword arguments, declare a name for the connection other than ``"snowflake"``. For example, if your Snowflake account supports SSO, you can set up a quick local connection for development using `browser-based SSO `_. Because there is nothing configured in ``secrets.toml``, the name is an empty string and the type is set to ``"snowflake"``. This prevents Streamlit from ignoring the keyword arguments and using a default Snowflake connection. >>> import streamlit as st >>> conn = st.connection( ... "", ... type="snowflake", ... account="xxx-xxx", ... user="xxx", ... authenticator="externalbrowser", ... ) >>> df = conn.query("SELECT * FROM my_table") **Example 3: Named connection with Snowflake's connection configuration file** Snowflake's Python Connector supports a `connection configuration file `_, which is well integrated with Streamlit's ``SnowflakeConnection``. If you already have one or more connections configured, all you need to do is pass the name of the connection to use. ``~/.snowflake/connections.toml``: >>> [my_connection] >>> account = "xxx-xxx" >>> user = "xxx" >>> password = "xxx" >>> warehouse = "xxx" >>> database = "xxx" >>> schema = "xxx" Your app code: >>> import streamlit as st >>> conn = st.connection("my_connection", type="snowflake") >>> df = conn.query("SELECT * FROM my_table") **Example 4: Named connection with Streamlit secrets and Snowflake's connection configuration file** If you have a Snowflake configuration file with a connection named ``my_connection`` as in Example 3, you can pass the connection name through ``secrets.toml``. ``.streamlit/secrets.toml``: >>> [connections.snowflake] >>> connection_name = "my_connection" Your app code: >>> import streamlit as st >>> conn = st.connection("snowflake") >>> df = conn.query("SELECT * FROM my_table") **Example 5: Default connection with an environment variable** If you don't have a ``[connections.snowflake]`` dictionary in your ``secrets.toml`` file and use ``st.connection("snowflake")``, Streamlit will use the default connection for the `Snowflake Python Connector `_. If you have a Snowflake configuration file with a connection named ``my_connection`` as in Example 3, you can set an environment variable to declare it as the default Snowflake connection. >>> SNOWFLAKE_DEFAULT_CONNECTION_NAME = "my_connection" Your app code: >>> import streamlit as st >>> conn = st.connection("snowflake") >>> df = conn.query("SELECT * FROM my_table") **Example 6: Default connection in Snowflake's connection configuration file** If you have a Snowflake configuration file that defines your ``default`` connection, Streamlit will automatically use it if no other connection is declared. ``~/.snowflake/connections.toml``: >>> [default] >>> account = "xxx-xxx" >>> user = "xxx" >>> password = "xxx" >>> warehouse = "xxx" >>> database = "xxx" >>> schema = "xxx" Your app code: >>> import streamlit as st >>> conn = st.connection("snowflake") >>> df = conn.query("SELECT * FROM my_table") """ def _connect(self, **kwargs: Any) -> InternalSnowflakeConnection: import snowflake.connector # type:ignore[import] from snowflake.connector import Error as SnowflakeError # type:ignore[import] # If we're running in SiS, just call get_active_session() and retrieve the # lower-level connection from it. if running_in_sis(): from snowflake.snowpark.context import ( # type:ignore[import] # isort: skip get_active_session, ) session = get_active_session() if hasattr(session, "connection"): return session.connection # session.connection is only a valid attr in more recent versions of # snowflake-connector-python, so we fall back to grabbing # session._conn._conn if `.connection` is unavailable. return session._conn._conn # We require qmark-style parameters everywhere for consistency across different # environments where SnowflakeConnections may be used. snowflake.connector.paramstyle = "qmark" # Otherwise, attempt to create a new connection from whatever credentials we # have available. try: st_secrets = self._secrets.to_dict() if len(st_secrets): _LOGGER.info( "Connect to Snowflake using the Streamlit secret defined under " "[connections.snowflake]." ) conn_kwargs = {**st_secrets, **kwargs} return snowflake.connector.connect(**conn_kwargs) # Use the default configuration as defined in https://docs.snowflake.cn/en/developer-guide/python-connector/python-connector-connect#setting-a-default-connection if self._connection_name == "snowflake": _LOGGER.info( "Connect to Snowflake using the default configuration as defined " "in https://docs.snowflake.cn/en/developer-guide/python-connector/python-connector-connect#setting-a-default-connection" ) return snowflake.connector.connect() return snowflake.connector.connect(**kwargs) except SnowflakeError: if not len(st_secrets) and not kwargs: raise StreamlitAPIException( "Missing Snowflake connection configuration. " "Did you forget to set this in `secrets.toml`, a Snowflake configuration file, " "or as kwargs to `st.connection`? " "See the [SnowflakeConnection configuration documentation]" "(https://docs.streamlit.io/st.connections.snowflakeconnection-configuration) " "for more details and examples." ) raise def query( self, sql: str, *, # keyword-only arguments: ttl: float | int | timedelta | None = None, show_spinner: bool | str = "Running `snowflake.query(...)`.", params: Any = None, **kwargs: Any, ) -> DataFrame: """Run a read-only SQL query. This method implements query result caching and simple error handling/retries. The caching behavior is identical to that of using ``@st.cache_data``. .. note:: Queries that are run without a specified ``ttl`` are cached indefinitely. Parameters ---------- sql : str The read-only SQL query to execute. ttl : float, int, timedelta or None The maximum number of seconds to keep results in the cache. If this is ``None`` (default), cached results do not expire with time. show_spinner : boolean or string Whether to enable the spinner. When a cached query is executed, no spinner is displayed because the result is immediately available. When a new query is executed, the default is to show a spinner with the message "Running ``snowflake.query(...)``." If this is ``False``, no spinner displays while executing the query. If this is a string, the string will be used as the message for the spinner. params : list, tuple, dict or None List of parameters to pass to the Snowflake Connector for Python ``Cursor.execute()`` method. This connector supports binding data to a SQL statement using qmark bindings. For more information and examples, see the `Snowflake Connector for Python documentation `_. This defaults to ``None``. Returns ------- pandas.DataFrame The result of running the query, formatted as a pandas DataFrame. Example ------- >>> import streamlit as st >>> >>> conn = st.connection("snowflake") >>> df = conn.query("SELECT * FROM my_table") >>> st.dataframe(df) """ from tenacity import retry, retry_if_exception, stop_after_attempt, wait_fixed @retry( after=lambda _: self.reset(), stop=stop_after_attempt(3), reraise=True, # We don't have to implement retries ourself for most error types as the # `snowflake-connector-python` library already implements retries for # retryable HTTP errors. retry=retry_if_exception( lambda e: hasattr(e, "sqlstate") and e.sqlstate == SQLSTATE_CONNECTION_WAS_NOT_ESTABLISHED ), wait=wait_fixed(1), ) def _query(sql: str) -> DataFrame: cur = self._instance.cursor() cur.execute(sql, params=params, **kwargs) return cur.fetch_pandas_all() # We modify our helper function's `__qualname__` here to work around default # `@st.cache_data` behavior. Otherwise, `.query()` being called with different # `ttl` values will reset the cache with each call, and the query caches won't # be scoped by connection. ttl_str = str( # Avoid adding extra `.` characters to `__qualname__` ttl ).replace(".", "_") _query.__qualname__ = f"{_query.__qualname__}_{self._connection_name}_{ttl_str}" _query = cache_data( show_spinner=show_spinner, ttl=ttl, )(_query) return _query(sql) def write_pandas( self, df: DataFrame, table_name: str, database: str | None = None, schema: str | None = None, chunk_size: int | None = None, **kwargs: Any, ) -> tuple[bool, int, int]: """Write a ``pandas.DataFrame`` to a table in a Snowflake database. This convenience method is a thin wrapper around ``snowflake.connector.pandas_tools.write_pandas()`` using the underlying connection. The ``conn`` parameter is passed automatically. For more information and additional keyword arguments, see the `Snowflake Connector for Python documentation `_. Parameters ---------- df: pandas.DataFrame The ``pandas.DataFrame`` object containing the data to be copied into the table. table_name: str Name of the table where the data should be copied to. database: str Name of the database containing the table. By default, the function writes to the database that is currently in use in the session. .. Note:: If you specify this parameter, you must also specify the schema parameter. schema: str Name of the schema containing the table. By default, the function writes to the table in the schema that is currently in use in the session. chunk_size: int Number of elements to insert at a time. By default, the function inserts all elements in one chunk. **kwargs: Any Additional keyword arguments for ``snowflake.connector.pandas_tools.write_pandas()``. Returns ------- tuple[bool, int, int] A tuple containing three values: 1. A boolean value that is ``True`` if the write was successful. 2. An integer giving the number of chunks of data that were copied. 3. An integer giving the number of rows that were inserted. Example ------- The following example uses the database and schema currently in use in the session and copies the data into a table named "my_table." >>> import streamlit as st >>> import pandas as pd >>> >>> df = pd.DataFrame( ... {"Name": ["Mary", "John", "Robert"], "Pet": ["dog", "cat", "bird"]} ... ) >>> conn = st.connection("snowflake") >>> conn.write_pandas(df, "my_table") """ from snowflake.connector.pandas_tools import write_pandas # type:ignore[import] success, nchunks, nrows, _ = write_pandas( conn=self._instance, df=df, table_name=table_name, database=database, schema=schema, chunk_size=chunk_size, **kwargs, ) return (success, nchunks, nrows) def cursor(self) -> SnowflakeCursor: """Create a new cursor object from this connection. Snowflake Connector cursors implement the Python Database API v2.0 specification (PEP-249). For more information, see the `Snowflake Connector for Python documentation `_. Returns ------- snowflake.connector.cursor.SnowflakeCursor A cursor object for the connection. Example ------- The following example uses a cursor to insert multiple rows into a table. The ``qmark`` parameter style is specified as an optional keyword argument. Alternatively, the parameter style can be declared in your connection configuration file. For more information, see the `Snowflake Connector for Python documentation `_. >>> import streamlit as st >>> >>> conn = st.connection("snowflake", "paramstyle"="qmark") >>> rows_to_insert = [("Mary", "dog"), ("John", "cat"), ("Robert", "bird")] >>> conn.cursor().executemany( ... "INSERT INTO mytable (name, pet) VALUES (?, ?)", rows_to_insert ... ) """ return self._instance.cursor() @property def raw_connection(self) -> InternalSnowflakeConnection: """Access the underlying connection object from the Snowflake\ Connector for Python. For information on how to use the Snowflake Connector for Python, see the `Snowflake Connector for Python documentation `_. Returns ------- snowflake.connector.connection.SnowflakeConnection The connection object. Example ------- The following example uses a cursor to submit an asynchronous query, saves the query ID, then periodically checks the query status through the connection before retrieving the results. >>> import streamlit as st >>> import time >>> >>> conn = st.connection("snowflake") >>> cur = conn.cursor() >>> cur.execute_async("SELECT * FROM my_table") >>> query_id = cur.sfqid >>> while True: ... status = conn.raw_connection.get_query_status(query_id) ... if conn.raw_connection.is_still_running(status): ... time.sleep(1) ... else: ... break >>> cur.get_results_from_sfqid(query_id) >>> df = cur.fetchall() """ return self._instance def session(self) -> Session: """Create a new Snowpark session from this connection. For information on how to use Snowpark sessions, see the `Snowpark developer guide `_ and `Snowpark API Reference `_. Returns ------- snowflake.snowpark.Session A new Snowpark session for this connection. Example ------- The following example creates a new Snowpark session and uses it to run a query. >>> import streamlit as st >>> >>> conn = st.connection("snowflake") >>> session = conn.session() >>> df = session.sql("SELECT * FROM my_table").collect() """ from snowflake.snowpark.context import get_active_session # type:ignore[import] from snowflake.snowpark.session import Session # type:ignore[import] if running_in_sis(): return get_active_session() return cast( "Session", Session.builder.configs({"connection": self._instance}).create() )