team-10/env/Lib/site-packages/narwhals/_ibis/namespace.py
2025-08-02 07:34:44 +02:00

157 lines
5.3 KiB
Python

from __future__ import annotations
import operator
from functools import reduce
from itertools import chain
from typing import TYPE_CHECKING, Any, cast
import ibis
import ibis.expr.types as ir
from narwhals._expression_parsing import (
combine_alias_output_names,
combine_evaluate_output_names,
)
from narwhals._ibis.dataframe import IbisLazyFrame
from narwhals._ibis.expr import IbisExpr
from narwhals._ibis.selectors import IbisSelectorNamespace
from narwhals._ibis.utils import function, lit, narwhals_to_native_dtype
from narwhals._sql.namespace import SQLNamespace
from narwhals._sql.when_then import SQLThen, SQLWhen
from narwhals._utils import Implementation, requires
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from narwhals._utils import Version
from narwhals.typing import ConcatMethod, IntoDType, PythonLiteral
class IbisNamespace(SQLNamespace[IbisLazyFrame, IbisExpr, "ir.Table", "ir.Value"]):
_implementation: Implementation = Implementation.IBIS
def __init__(self, *, version: Version) -> None:
self._version = version
@property
def selectors(self) -> IbisSelectorNamespace:
return IbisSelectorNamespace.from_namespace(self)
@property
def _expr(self) -> type[IbisExpr]:
return IbisExpr
@property
def _lazyframe(self) -> type[IbisLazyFrame]:
return IbisLazyFrame
def _function(self, name: str, *args: ir.Value | PythonLiteral) -> ir.Value:
return function(name, *args)
def _lit(self, value: Any) -> ir.Value:
return lit(value)
def _when(self, condition: ir.Value, value: ir.Value) -> ir.Value:
return ibis.cases((condition, value))
def _coalesce(self, *exprs: ir.Value) -> ir.Value:
return ibis.coalesce(*exprs)
def concat(
self, items: Iterable[IbisLazyFrame], *, how: ConcatMethod
) -> IbisLazyFrame:
if how == "diagonal":
msg = "diagonal concat not supported for Ibis. Please join instead."
raise NotImplementedError(msg)
items = list(items)
native_items = [item.native for item in items]
schema = items[0].schema
if not all(x.schema == schema for x in items[1:]):
msg = "inputs should all have the same schema"
raise TypeError(msg)
return self._lazyframe.from_native(ibis.union(*native_items), context=self)
def concat_str(
self, *exprs: IbisExpr, separator: str, ignore_nulls: bool
) -> IbisExpr:
def func(df: IbisLazyFrame) -> list[ir.Value]:
cols = list(chain.from_iterable(expr(df) for expr in exprs))
cols_casted = [s.cast("string") for s in cols]
if not ignore_nulls:
result = cols_casted[0]
for col in cols_casted[1:]:
result = result + separator + col
else:
sep = cast("ir.StringValue", lit(separator))
result = sep.join(cols_casted)
return [result]
return self._expr(
call=func,
evaluate_output_names=combine_evaluate_output_names(*exprs),
alias_output_names=combine_alias_output_names(*exprs),
version=self._version,
)
def mean_horizontal(self, *exprs: IbisExpr) -> IbisExpr:
def func(cols: Iterable[ir.Value]) -> ir.Value:
cols = list(cols)
return reduce(operator.add, (col.fill_null(lit(0)) for col in cols)) / reduce(
operator.add, (col.isnull().ifelse(lit(0), lit(1)) for col in cols)
)
return self._expr._from_elementwise_horizontal_op(func, *exprs)
@requires.backend_version((10, 0))
def when(self, predicate: IbisExpr) -> IbisWhen:
return IbisWhen.from_expr(predicate, context=self)
def lit(self, value: Any, dtype: IntoDType | None) -> IbisExpr:
def func(_df: IbisLazyFrame) -> Sequence[ir.Value]:
ibis_dtype = narwhals_to_native_dtype(dtype, self._version) if dtype else None
return [lit(value, ibis_dtype)]
return self._expr(
func,
evaluate_output_names=lambda _df: ["literal"],
alias_output_names=None,
version=self._version,
)
def len(self) -> IbisExpr:
def func(_df: IbisLazyFrame) -> list[ir.Value]:
return [_df.native.count()]
return self._expr(
call=func,
evaluate_output_names=lambda _df: ["len"],
alias_output_names=None,
version=self._version,
)
class IbisWhen(SQLWhen["IbisLazyFrame", "ir.Value", IbisExpr]):
lit = lit
@property
def _then(self) -> type[IbisThen]:
return IbisThen
def __call__(self, df: IbisLazyFrame) -> Sequence[ir.Value]:
is_expr = self._condition._is_expr
condition = df._evaluate_expr(self._condition)
then_ = self._then_value
then = df._evaluate_expr(then_) if is_expr(then_) else lit(then_)
other_ = self._otherwise_value
if other_ is None:
result = ibis.cases((condition, then))
else:
otherwise = df._evaluate_expr(other_) if is_expr(other_) else lit(other_)
result = ibis.cases((condition, then), else_=otherwise)
return [result]
class IbisThen(SQLThen["IbisLazyFrame", "ir.Value", IbisExpr], IbisExpr): ...