team-10/env/Lib/site-packages/narwhals/_pandas_like/dataframe.py
2025-08-02 07:34:44 +02:00

1139 lines
41 KiB
Python

from __future__ import annotations
from collections.abc import Iterable, Iterator, Mapping, Sequence
from itertools import chain, product
from typing import TYPE_CHECKING, Any, Callable, Literal, cast, overload
import numpy as np
from narwhals._compliant import EagerDataFrame
from narwhals._pandas_like.series import PANDAS_TO_NUMPY_DTYPE_MISSING, PandasLikeSeries
from narwhals._pandas_like.utils import (
align_and_extract_native,
get_dtype_backend,
import_array_module,
iter_dtype_backends,
native_to_narwhals_dtype,
object_native_to_narwhals_dtype,
rename,
select_columns_by_name,
set_index,
)
from narwhals._typing_compat import assert_never
from narwhals._utils import (
Implementation,
_into_arrow_table,
_remap_full_join_keys,
check_column_names_are_unique,
exclude_column_names,
generate_temporary_column_name,
parse_columns_to_drop,
scale_bytes,
)
from narwhals.dependencies import is_pandas_like_dataframe
from narwhals.exceptions import InvalidOperationError, ShapeError
if TYPE_CHECKING:
from io import BytesIO
from pathlib import Path
from types import ModuleType
import pandas as pd
import polars as pl
from typing_extensions import Self, TypeAlias, TypeIs
from narwhals._compliant.typing import CompliantDataFrameAny, CompliantLazyFrameAny
from narwhals._pandas_like.expr import PandasLikeExpr
from narwhals._pandas_like.group_by import PandasLikeGroupBy
from narwhals._pandas_like.namespace import PandasLikeNamespace
from narwhals._translate import IntoArrowTable
from narwhals._utils import Version, _LimitedContext
from narwhals.dtypes import DType
from narwhals.schema import Schema
from narwhals.typing import (
AsofJoinStrategy,
DTypeBackend,
JoinStrategy,
PivotAgg,
SizedMultiIndexSelector,
SizedMultiNameSelector,
SizeUnit,
UniqueKeepStrategy,
_2DArray,
_SliceIndex,
_SliceName,
)
Constructor: TypeAlias = Callable[..., pd.DataFrame]
CLASSICAL_NUMPY_DTYPES: frozenset[np.dtype[Any]] = frozenset(
[
np.dtype("float64"),
np.dtype("float32"),
np.dtype("int64"),
np.dtype("int32"),
np.dtype("int16"),
np.dtype("int8"),
np.dtype("uint64"),
np.dtype("uint32"),
np.dtype("uint16"),
np.dtype("uint8"),
np.dtype("bool"),
np.dtype("datetime64[s]"),
np.dtype("datetime64[ms]"),
np.dtype("datetime64[us]"),
np.dtype("datetime64[ns]"),
np.dtype("timedelta64[s]"),
np.dtype("timedelta64[ms]"),
np.dtype("timedelta64[us]"),
np.dtype("timedelta64[ns]"),
np.dtype("object"),
]
)
class PandasLikeDataFrame(
EagerDataFrame["PandasLikeSeries", "PandasLikeExpr", "Any", "pd.Series[Any]"]
):
def __init__(
self,
native_dataframe: Any,
*,
implementation: Implementation,
version: Version,
validate_column_names: bool,
validate_backend_version: bool = False,
) -> None:
self._native_frame = native_dataframe
self._implementation = implementation
self._version = version
if validate_column_names:
check_column_names_are_unique(native_dataframe.columns)
if validate_backend_version:
self._validate_backend_version()
@classmethod
def from_arrow(cls, data: IntoArrowTable, /, *, context: _LimitedContext) -> Self:
implementation = context._implementation
tbl = _into_arrow_table(data, context)
if implementation.is_pandas():
native = tbl.to_pandas()
elif implementation.is_modin(): # pragma: no cover
from modin.pandas.utils import (
from_arrow as mpd_from_arrow, # pyright: ignore[reportAttributeAccessIssue]
)
native = mpd_from_arrow(tbl)
elif implementation.is_cudf(): # pragma: no cover
native = implementation.to_native_namespace().DataFrame.from_arrow(tbl)
else: # pragma: no cover
msg = "congratulations, you entered unreachable code - please report a bug"
raise AssertionError(msg)
return cls.from_native(native, context=context)
@classmethod
def from_dict(
cls,
data: Mapping[str, Any],
/,
*,
context: _LimitedContext,
schema: Mapping[str, DType] | Schema | None,
) -> Self:
from narwhals.schema import Schema
implementation = context._implementation
ns = implementation.to_native_namespace()
Series = cast("type[pd.Series[Any]]", ns.Series) # noqa: N806
DataFrame = cast("type[pd.DataFrame]", ns.DataFrame) # noqa: N806
aligned_data: dict[str, pd.Series[Any] | Any] = {}
left_most: PandasLikeSeries | None = None
for name, series in data.items():
if isinstance(series, Series):
compliant = PandasLikeSeries.from_native(series, context=context)
if left_most is None:
left_most = compliant
aligned_data[name] = series
else:
aligned_data[name] = align_and_extract_native(left_most, compliant)[1]
else:
aligned_data[name] = series
if aligned_data or not schema:
native = DataFrame.from_dict(aligned_data)
else:
native = DataFrame.from_dict({col: [] for col in schema})
if schema:
backend: Iterable[DTypeBackend] | None = None
if aligned_data:
backend = iter_dtype_backends(native.dtypes, implementation)
native = native.astype(Schema(schema).to_pandas(backend))
return cls.from_native(native, context=context)
@staticmethod
def _is_native(obj: Any) -> TypeIs[Any]:
return is_pandas_like_dataframe(obj) # pragma: no cover
@classmethod
def from_native(cls, data: Any, /, *, context: _LimitedContext) -> Self:
return cls(
data,
implementation=context._implementation,
version=context._version,
validate_column_names=True,
)
@classmethod
def from_numpy(
cls,
data: _2DArray,
/,
*,
context: _LimitedContext,
schema: Mapping[str, DType] | Schema | Sequence[str] | None,
) -> Self:
from narwhals.schema import Schema
implementation = context._implementation
DataFrame: Constructor = implementation.to_native_namespace().DataFrame # noqa: N806
if isinstance(schema, (Mapping, Schema)):
it: Iterable[DTypeBackend] = (
get_dtype_backend(native_type, implementation)
for native_type in schema.values()
)
native = DataFrame(data, columns=schema.keys()).astype(
Schema(schema).to_pandas(it)
)
else:
native = DataFrame(data, columns=cls._numpy_column_names(data, schema))
return cls.from_native(native, context=context)
def __narwhals_dataframe__(self) -> Self:
return self
def __narwhals_lazyframe__(self) -> Self:
return self
def __narwhals_namespace__(self) -> PandasLikeNamespace:
from narwhals._pandas_like.namespace import PandasLikeNamespace
return PandasLikeNamespace(self._implementation, version=self._version)
def __native_namespace__(self) -> ModuleType:
if self._implementation in {
Implementation.PANDAS,
Implementation.MODIN,
Implementation.CUDF,
}:
return self._implementation.to_native_namespace()
msg = f"Expected pandas/modin/cudf, got: {type(self._implementation)}" # pragma: no cover
raise AssertionError(msg)
def __len__(self) -> int:
return len(self.native)
def _with_version(self, version: Version) -> Self:
return self.__class__(
self.native,
implementation=self._implementation,
version=version,
validate_column_names=False,
)
def _with_native(self, df: Any, *, validate_column_names: bool = True) -> Self:
return self.__class__(
df,
implementation=self._implementation,
version=self._version,
validate_column_names=validate_column_names,
)
def _extract_comparand(self, other: PandasLikeSeries) -> pd.Series[Any]:
index = self.native.index
if other._broadcast:
s = other.native
return type(s)(s.iloc[0], index=index, dtype=s.dtype, name=s.name)
if (len_other := len(other)) != (len_idx := len(index)):
msg = f"Expected object of length {len_idx}, got: {len_other}."
raise ShapeError(msg)
if other.native.index is not index:
return set_index(other.native, index, implementation=other._implementation)
return other.native
@property
def _array_funcs(self): # type: ignore[no-untyped-def] # noqa: ANN202
if TYPE_CHECKING:
import numpy as np
return np
else:
return import_array_module(self._implementation)
def get_column(self, name: str) -> PandasLikeSeries:
return PandasLikeSeries.from_native(self.native[name], context=self)
def __array__(self, dtype: Any = None, *, copy: bool | None = None) -> _2DArray:
return self.to_numpy(dtype=dtype, copy=copy)
def _gather(self, rows: SizedMultiIndexSelector[pd.Series[Any]]) -> Self:
items = list(rows) if isinstance(rows, tuple) else rows
return self._with_native(self.native.iloc[items, :])
def _gather_slice(self, rows: _SliceIndex | range) -> Self:
return self._with_native(
self.native.iloc[slice(rows.start, rows.stop, rows.step), :],
validate_column_names=False,
)
def _select_slice_name(self, columns: _SliceName) -> Self:
start = (
self.native.columns.get_loc(columns.start)
if columns.start is not None
else None
)
stop = (
self.native.columns.get_loc(columns.stop) + 1
if columns.stop is not None
else None
)
selector = slice(start, stop, columns.step)
return self._with_native(
self.native.iloc[:, selector], validate_column_names=False
)
def _select_slice_index(self, columns: _SliceIndex | range) -> Self:
return self._with_native(
self.native.iloc[:, columns], validate_column_names=False
)
def _select_multi_index(
self, columns: SizedMultiIndexSelector[pd.Series[Any]]
) -> Self:
columns = list(columns) if isinstance(columns, tuple) else columns
return self._with_native(
self.native.iloc[:, columns], validate_column_names=False
)
def _select_multi_name(self, columns: SizedMultiNameSelector[pd.Series[Any]]) -> Self:
return self._with_native(self.native.loc[:, columns])
# --- properties ---
@property
def columns(self) -> list[str]:
return self.native.columns.tolist()
@overload
def rows(self, *, named: Literal[True]) -> list[dict[str, Any]]: ...
@overload
def rows(self, *, named: Literal[False]) -> list[tuple[Any, ...]]: ...
@overload
def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]: ...
def rows(self, *, named: bool) -> list[tuple[Any, ...]] | list[dict[str, Any]]:
if not named:
# cuDF does not support itertuples. But it does support to_dict!
if self._implementation is Implementation.CUDF:
# Extract the row values from the named rows
return [tuple(row.values()) for row in self.rows(named=True)]
return list(self.native.itertuples(index=False, name=None))
return self.native.to_dict(orient="records")
def iter_columns(self) -> Iterator[PandasLikeSeries]:
for _name, series in self.native.items(): # noqa: PERF102
yield PandasLikeSeries.from_native(series, context=self)
_iter_columns = iter_columns
def iter_rows(
self, *, named: bool, buffer_size: int
) -> Iterator[tuple[Any, ...]] | Iterator[dict[str, Any]]:
# The param ``buffer_size`` is only here for compatibility with the Polars API
# and has no effect on the output.
if not named:
yield from self.native.itertuples(index=False, name=None)
else:
col_names = self.native.columns
for row in self.native.itertuples(index=False):
yield dict(zip(col_names, row))
@property
def schema(self) -> dict[str, DType]:
native_dtypes = self.native.dtypes
return {
col: native_to_narwhals_dtype(
native_dtypes[col], self._version, self._implementation
)
if native_dtypes[col] != "object"
else object_native_to_narwhals_dtype(
self.native[col], self._version, self._implementation
)
for col in self.native.columns
}
def collect_schema(self) -> dict[str, DType]:
return self.schema
# --- reshape ---
def simple_select(self, *column_names: str) -> Self:
return self._with_native(
select_columns_by_name(self.native, list(column_names), self._implementation),
validate_column_names=False,
)
def select(self, *exprs: PandasLikeExpr) -> Self:
new_series = self._evaluate_into_exprs(*exprs)
if not new_series:
# return empty dataframe, like Polars does
return self._with_native(type(self.native)(), validate_column_names=False)
new_series = new_series[0]._align_full_broadcast(*new_series)
namespace = self.__narwhals_namespace__()
df = namespace._concat_horizontal([s.native for s in new_series])
# `concat` creates a new object, so fine to modify `.columns.name` inplace.
df.columns.name = self.native.columns.name
return self._with_native(df, validate_column_names=True)
def drop_nulls(self, subset: Sequence[str] | None) -> Self:
if subset is None:
return self._with_native(
self.native.dropna(axis=0), validate_column_names=False
)
plx = self.__narwhals_namespace__()
mask = ~plx.any_horizontal(plx.col(*subset).is_null(), ignore_nulls=True)
return self.filter(mask)
def estimated_size(self, unit: SizeUnit) -> int | float:
sz = self.native.memory_usage(deep=True).sum()
return scale_bytes(sz, unit=unit)
def with_row_index(self, name: str, order_by: Sequence[str] | None) -> Self:
plx = self.__narwhals_namespace__()
if order_by is None:
size = len(self)
data = self._array_funcs.arange(size)
row_index = plx._expr._from_series(
plx._series.from_iterable(
data, context=self, index=self.native.index, name=name
)
)
else:
rank = plx.col(order_by[0]).rank(method="ordinal", descending=False)
row_index = (rank.over(partition_by=[], order_by=order_by) - 1).alias(name)
return self.select(row_index, plx.all())
def row(self, index: int) -> tuple[Any, ...]:
return tuple(x for x in self.native.iloc[index])
def filter(self, predicate: PandasLikeExpr | list[bool]) -> Self:
if isinstance(predicate, list):
mask_native: pd.Series[Any] | list[bool] = predicate
else:
# `[0]` is safe as the predicate's expression only returns a single column
mask = self._evaluate_into_exprs(predicate)[0]
mask_native = self._extract_comparand(mask)
return self._with_native(
self.native.loc[mask_native], validate_column_names=False
)
def with_columns(self, *exprs: PandasLikeExpr) -> Self:
columns = self._evaluate_into_exprs(*exprs)
if not columns and len(self) == 0:
return self
name_columns: dict[str, PandasLikeSeries] = {s.name: s for s in columns}
to_concat = []
# Make sure to preserve column order
for name in self.native.columns:
if name in name_columns:
series = self._extract_comparand(name_columns.pop(name))
else:
series = self.native[name]
to_concat.append(series)
to_concat.extend(self._extract_comparand(s) for s in name_columns.values())
namespace = self.__narwhals_namespace__()
df = namespace._concat_horizontal(to_concat)
# `concat` creates a new object, so fine to modify `.columns.name` inplace.
df.columns.name = self.native.columns.name
return self._with_native(df, validate_column_names=False)
def rename(self, mapping: Mapping[str, str]) -> Self:
return self._with_native(
rename(self.native, columns=mapping, implementation=self._implementation)
)
def drop(self, columns: Sequence[str], *, strict: bool) -> Self:
to_drop = parse_columns_to_drop(self, columns, strict=strict)
return self._with_native(
self.native.drop(columns=to_drop), validate_column_names=False
)
# --- transform ---
def sort(self, *by: str, descending: bool | Sequence[bool], nulls_last: bool) -> Self:
df = self.native
if isinstance(descending, bool):
ascending: bool | list[bool] = not descending
else:
ascending = [not d for d in descending]
na_position = "last" if nulls_last else "first"
return self._with_native(
df.sort_values(list(by), ascending=ascending, na_position=na_position),
validate_column_names=False,
)
# --- convert ---
def collect(
self, backend: Implementation | None, **kwargs: Any
) -> CompliantDataFrameAny:
if backend is None:
return PandasLikeDataFrame(
self.native,
implementation=self._implementation,
version=self._version,
validate_column_names=False,
)
if backend is Implementation.PANDAS:
kwds: dict[str, Any] = {
"implementation": Implementation.PANDAS,
"version": self._version,
"validate_column_names": False,
}
if backend is not self._implementation:
kwds.update(validate_backend_version=True)
return PandasLikeDataFrame(self.to_pandas(), **kwds)
if backend is Implementation.PYARROW:
from narwhals._arrow.dataframe import ArrowDataFrame
return ArrowDataFrame(
native_dataframe=self.to_arrow(),
validate_backend_version=True,
version=self._version,
validate_column_names=False,
)
if backend is Implementation.POLARS:
from narwhals._polars.dataframe import PolarsDataFrame
return PolarsDataFrame(
df=self.to_polars(), validate_backend_version=True, version=self._version
)
msg = f"Unsupported `backend` value: {backend}" # pragma: no cover
raise ValueError(msg) # pragma: no cover
# --- actions ---
def group_by(
self, keys: Sequence[str] | Sequence[PandasLikeExpr], *, drop_null_keys: bool
) -> PandasLikeGroupBy:
from narwhals._pandas_like.group_by import PandasLikeGroupBy
return PandasLikeGroupBy(self, keys, drop_null_keys=drop_null_keys)
def _join_inner(
self, other: Self, *, left_on: Sequence[str], right_on: Sequence[str], suffix: str
) -> pd.DataFrame:
return self.native.merge(
other.native,
left_on=left_on,
right_on=right_on,
how="inner",
suffixes=("", suffix),
)
def _join_left(
self, other: Self, *, left_on: Sequence[str], right_on: Sequence[str], suffix: str
) -> pd.DataFrame:
result_native = self.native.merge(
other.native,
how="left",
left_on=left_on,
right_on=right_on,
suffixes=("", suffix),
)
extra = [
right_key if right_key not in self.columns else f"{right_key}{suffix}"
for left_key, right_key in zip(left_on, right_on)
if right_key != left_key
]
# NOTE: Keep `inplace=True` to avoid making a redundant copy.
# This may need updating, depending on https://github.com/pandas-dev/pandas/pull/51466/files
result_native.drop(columns=extra, inplace=True) # noqa: PD002
return result_native
def _join_full(
self, other: Self, *, left_on: Sequence[str], right_on: Sequence[str], suffix: str
) -> pd.DataFrame:
# Pandas coalesces keys in full joins unless there's no collision
right_on_mapper = _remap_full_join_keys(left_on, right_on, suffix)
other_native = other.native.rename(columns=right_on_mapper)
check_column_names_are_unique(other_native.columns)
right_suffixed = list(right_on_mapper.values())
return self.native.merge(
other_native,
left_on=left_on,
right_on=right_suffixed,
how="outer",
suffixes=("", suffix),
)
def _join_cross(self, other: Self, *, suffix: str) -> pd.DataFrame:
implementation = self._implementation
backend_version = self._backend_version
if (implementation.is_modin() or implementation.is_cudf()) or (
implementation.is_pandas() and backend_version < (1, 4)
):
key_token = generate_temporary_column_name(
n_bytes=8, columns=(*self.columns, *other.columns)
)
result_native = self.native.assign(**{key_token: 0}).merge(
other.native.assign(**{key_token: 0}),
how="inner",
left_on=key_token,
right_on=key_token,
suffixes=("", suffix),
)
# NOTE: Keep `inplace=True` to avoid making a redundant copy.
# This may need updating, depending on https://github.com/pandas-dev/pandas/pull/51466/files
result_native.drop(columns=key_token, inplace=True) # noqa: PD002
return result_native
return self.native.merge(other.native, how="cross", suffixes=("", suffix))
def _join_semi(
self, other: Self, *, left_on: Sequence[str], right_on: Sequence[str]
) -> pd.DataFrame:
other_native = self._join_filter_rename(
other=other,
columns_to_select=list(right_on),
columns_mapping=dict(zip(right_on, left_on)),
)
return self.native.merge(
other_native, how="inner", left_on=left_on, right_on=left_on
)
def _join_anti(
self, other: Self, *, left_on: Sequence[str], right_on: Sequence[str]
) -> pd.DataFrame:
implementation = self._implementation
if implementation.is_cudf():
return self.native.merge(
other.native, how="leftanti", left_on=left_on, right_on=right_on
)
indicator_token = generate_temporary_column_name(
n_bytes=8, columns=(*self.columns, *other.columns)
)
other_native = self._join_filter_rename(
other=other,
columns_to_select=list(right_on),
columns_mapping=dict(zip(right_on, left_on)),
)
result_native = self.native.merge(
other_native,
# TODO(FBruzzesi): See https://github.com/modin-project/modin/issues/7384
how="left" if implementation.is_pandas() else "outer",
indicator=indicator_token,
left_on=left_on,
right_on=left_on,
).loc[lambda t: t[indicator_token] == "left_only"]
# NOTE: Keep `inplace=True` to avoid making a redundant copy.
# This may need updating, depending on https://github.com/pandas-dev/pandas/pull/51466/files
result_native.drop(columns=indicator_token, inplace=True) # noqa: PD002
return result_native
def _join_filter_rename(
self, other: Self, columns_to_select: list[str], columns_mapping: dict[str, str]
) -> pd.DataFrame:
"""Helper function to avoid creating extra columns and row duplication.
Used in `"anti"` and `"semi`" join's.
Notice that a native object is returned.
"""
implementation = self._implementation
return rename(
select_columns_by_name(
other.native,
column_names=columns_to_select,
implementation=implementation,
),
columns=columns_mapping,
implementation=implementation,
).drop_duplicates()
def join(
self,
other: Self,
*,
how: JoinStrategy,
left_on: Sequence[str] | None,
right_on: Sequence[str] | None,
suffix: str,
) -> Self:
if how == "cross":
result = self._join_cross(other=other, suffix=suffix)
elif left_on is None or right_on is None: # pragma: no cover
raise ValueError(left_on, right_on)
elif how == "inner":
result = self._join_inner(
other=other, left_on=left_on, right_on=right_on, suffix=suffix
)
elif how == "anti":
result = self._join_anti(other=other, left_on=left_on, right_on=right_on)
elif how == "semi":
result = self._join_semi(other=other, left_on=left_on, right_on=right_on)
elif how == "left":
result = self._join_left(
other=other, left_on=left_on, right_on=right_on, suffix=suffix
)
elif how == "full":
result = self._join_full(
other=other, left_on=left_on, right_on=right_on, suffix=suffix
)
else:
assert_never(how)
return self._with_native(result)
def join_asof(
self,
other: Self,
*,
left_on: str,
right_on: str,
by_left: Sequence[str] | None,
by_right: Sequence[str] | None,
strategy: AsofJoinStrategy,
suffix: str,
) -> Self:
plx = self.__native_namespace__()
return self._with_native(
plx.merge_asof(
self.native,
other.native,
left_on=left_on,
right_on=right_on,
left_by=by_left,
right_by=by_right,
direction=strategy,
suffixes=("", suffix),
)
)
# --- partial reduction ---
def head(self, n: int) -> Self:
return self._with_native(self.native.head(n), validate_column_names=False)
def tail(self, n: int) -> Self:
return self._with_native(self.native.tail(n), validate_column_names=False)
def unique(
self,
subset: Sequence[str] | None,
*,
keep: UniqueKeepStrategy,
maintain_order: bool | None = None,
) -> Self:
# The param `maintain_order` is only here for compatibility with the Polars API
# and has no effect on the output.
mapped_keep = {"none": False, "any": "first"}.get(keep, keep)
if subset and (error := self._check_columns_exist(subset)):
raise error
return self._with_native(
self.native.drop_duplicates(subset=subset, keep=mapped_keep),
validate_column_names=False,
)
# --- lazy-only ---
def lazy(self, *, backend: Implementation | None = None) -> CompliantLazyFrameAny:
pandas_df = self.to_pandas()
if backend is None:
return self
elif backend is Implementation.DUCKDB:
import duckdb # ignore-banned-import
from narwhals._duckdb.dataframe import DuckDBLazyFrame
return DuckDBLazyFrame(
df=duckdb.table("pandas_df"),
validate_backend_version=True,
version=self._version,
)
elif backend is Implementation.POLARS:
import polars as pl # ignore-banned-import
from narwhals._polars.dataframe import PolarsLazyFrame
return PolarsLazyFrame(
df=pl.from_pandas(pandas_df).lazy(),
validate_backend_version=True,
version=self._version,
)
elif backend is Implementation.DASK:
import dask.dataframe as dd # ignore-banned-import
from narwhals._dask.dataframe import DaskLazyFrame
return DaskLazyFrame(
native_dataframe=dd.from_pandas(pandas_df),
validate_backend_version=True,
version=self._version,
)
elif backend.is_ibis():
import ibis # ignore-banned-import
from narwhals._ibis.dataframe import IbisLazyFrame
return IbisLazyFrame(
ibis.memtable(pandas_df, columns=self.columns),
validate_backend_version=True,
version=self._version,
)
raise AssertionError # pragma: no cover
@property
def shape(self) -> tuple[int, int]:
return self.native.shape
def to_dict(self, *, as_series: bool) -> dict[str, Any]:
if as_series:
return {
col: PandasLikeSeries.from_native(self.native[col], context=self)
for col in self.columns
}
return self.native.to_dict(orient="list")
def to_numpy(self, dtype: Any = None, *, copy: bool | None = None) -> _2DArray:
native_dtypes = self.native.dtypes
if copy is None:
# pandas default differs from Polars, but cuDF default is True
copy = self._implementation is Implementation.CUDF
if native_dtypes.isin(CLASSICAL_NUMPY_DTYPES).all():
# Fast path, no conversions necessary.
if dtype is not None:
return self.native.to_numpy(dtype=dtype, copy=copy)
return self.native.to_numpy(copy=copy)
dtype_datetime = self._version.dtypes.Datetime
to_convert = [
key
for key, val in self.schema.items()
if isinstance(val, dtype_datetime) and val.time_zone is not None
]
if to_convert:
df = self.with_columns(
self.__narwhals_namespace__()
.col(*to_convert)
.dt.convert_time_zone("UTC")
.dt.replace_time_zone(None)
).native
else:
df = self.native
if dtype is not None:
return df.to_numpy(dtype=dtype, copy=copy)
# pandas return `object` dtype for nullable dtypes if dtype=None,
# so we cast each Series to numpy and let numpy find a common dtype.
# If there aren't any dtypes where `to_numpy()` is "broken" (i.e. it
# returns Object) then we just call `to_numpy()` on the DataFrame.
for col_dtype in native_dtypes:
if str(col_dtype) in PANDAS_TO_NUMPY_DTYPE_MISSING:
arr: Any = np.hstack(
[
self.get_column(col).to_numpy(copy=copy, dtype=None)[:, None]
for col in self.columns
]
)
return arr
return df.to_numpy(copy=copy)
def to_pandas(self) -> pd.DataFrame:
if self._implementation is Implementation.PANDAS:
return self.native
elif self._implementation is Implementation.CUDF:
return self.native.to_pandas()
elif self._implementation is Implementation.MODIN:
return self.native._to_pandas()
msg = f"Unknown implementation: {self._implementation}" # pragma: no cover
raise AssertionError(msg)
def to_polars(self) -> pl.DataFrame:
import polars as pl # ignore-banned-import
return pl.from_pandas(self.to_pandas())
def write_parquet(self, file: str | Path | BytesIO) -> None:
self.native.to_parquet(file)
@overload
def write_csv(self, file: None) -> str: ...
@overload
def write_csv(self, file: str | Path | BytesIO) -> None: ...
def write_csv(self, file: str | Path | BytesIO | None) -> str | None:
return self.native.to_csv(file, index=False)
# --- descriptive ---
def is_unique(self) -> PandasLikeSeries:
return PandasLikeSeries.from_native(
~self.native.duplicated(keep=False), context=self
)
def item(self, row: int | None, column: int | str | None) -> Any:
if row is None and column is None:
if self.shape != (1, 1):
msg = (
"can only call `.item()` if the dataframe is of shape (1, 1),"
" or if explicit row/col values are provided;"
f" frame has shape {self.shape!r}"
)
raise ValueError(msg)
return self.native.iloc[0, 0]
elif row is None or column is None:
msg = "cannot call `.item()` with only one of `row` or `column`"
raise ValueError(msg)
_col = self.columns.index(column) if isinstance(column, str) else column
return self.native.iloc[row, _col]
def clone(self) -> Self:
return self._with_native(self.native.copy(), validate_column_names=False)
def gather_every(self, n: int, offset: int) -> Self:
return self._with_native(self.native.iloc[offset::n], validate_column_names=False)
def _pivot_into_index_values(
self,
on: Sequence[str],
index: Sequence[str] | None,
values: Sequence[str] | None,
/,
) -> tuple[Sequence[str], Sequence[str]]:
index = index or (
exclude_column_names(self, {*on, *values})
if values
else exclude_column_names(self, on)
)
values = values or exclude_column_names(self, {*on, *index})
return index, values
@staticmethod
def _pivot_multi_on_name(unique_values: tuple[str, ...], /) -> str:
LB, RB, Q = "{", "}", '"' # noqa: N806
body = '","'.join(unique_values)
return f"{LB}{Q}{body}{Q}{RB}"
@staticmethod
def _pivot_single_on_names(
column_names: Iterable[str], n_values: int, separator: str, /
) -> list[str]:
if n_values > 1:
return [separator.join(col).strip() for col in column_names]
return [col[-1] for col in column_names]
def _pivot_multi_on_names(
self,
column_names: Iterable[tuple[str, ...]],
n_on: int,
n_values: int,
separator: str,
/,
) -> Iterator[str]:
if n_values > 1:
for col in column_names:
names = col[-n_on:]
prefix = col[0]
yield separator.join((prefix, self._pivot_multi_on_name(names)))
else:
for col in column_names:
yield self._pivot_multi_on_name(col[-n_on:])
def _pivot_remap_column_names(
self, column_names: Iterable[Any], *, n_on: int, n_values: int, separator: str
) -> list[str]:
"""Reformat output column names from a native pivot operation, to match `polars`.
Note:
`column_names` is a `pd.MultiIndex`, but not in the stubs.
"""
if n_on == 1:
return self._pivot_single_on_names(column_names, n_values, separator)
return list(self._pivot_multi_on_names(column_names, n_on, n_values, separator))
def _pivot_table(
self,
on: Sequence[str],
index: Sequence[str],
values: Sequence[str],
aggregate_function: Literal[
"min", "max", "first", "last", "sum", "mean", "median"
],
/,
) -> Any:
kwds: dict[Any, Any] = (
{} if self._implementation is Implementation.CUDF else {"observed": True}
)
return self.native.pivot_table(
values=values,
index=index,
columns=on,
aggfunc=aggregate_function,
margins=False,
**kwds,
)
def _pivot(
self,
on: Sequence[str],
index: Sequence[str],
values: Sequence[str],
aggregate_function: PivotAgg | None,
/,
) -> pd.DataFrame:
if aggregate_function is None:
return self.native.pivot(columns=on, index=index, values=values)
elif aggregate_function == "len":
return (
self.native.groupby([*on, *index], as_index=False)
.agg(dict.fromkeys(values, "size"))
.pivot(columns=on, index=index, values=values)
)
return self._pivot_table(on, index, values, aggregate_function)
def pivot(
self,
on: Sequence[str],
*,
index: Sequence[str] | None,
values: Sequence[str] | None,
aggregate_function: PivotAgg | None,
sort_columns: bool,
separator: str,
) -> Self:
implementation = self._implementation
if implementation.is_modin():
msg = "pivot is not supported for Modin backend due to https://github.com/modin-project/modin/issues/7409."
raise NotImplementedError(msg)
index, values = self._pivot_into_index_values(on, index, values)
result = self._pivot(on, index, values, aggregate_function)
# Select the columns in the right order
uniques = (
(
self.get_column(col)
.unique()
.sort(descending=False, nulls_last=False)
.to_list()
for col in on
)
if sort_columns
else (self.get_column(col).unique().to_list() for col in on)
)
ordered_cols = list(product(values, *chain(uniques)))
result = result.loc[:, ordered_cols]
columns = result.columns
remapped = self._pivot_remap_column_names(
columns, n_on=len(on), n_values=len(values), separator=separator
)
result.columns = remapped # type: ignore[assignment]
result.columns.names = [""]
return self._with_native(result.reset_index())
def to_arrow(self) -> Any:
if self._implementation is Implementation.CUDF:
return self.native.to_arrow(preserve_index=False)
import pyarrow as pa # ignore-banned-import()
return pa.Table.from_pandas(self.native)
def sample(
self,
n: int | None,
*,
fraction: float | None,
with_replacement: bool,
seed: int | None,
) -> Self:
return self._with_native(
self.native.sample(
n=n, frac=fraction, replace=with_replacement, random_state=seed
),
validate_column_names=False,
)
def unpivot(
self,
on: Sequence[str] | None,
index: Sequence[str] | None,
variable_name: str,
value_name: str,
) -> Self:
return self._with_native(
self.native.melt(
id_vars=index,
value_vars=on,
var_name=variable_name,
value_name=value_name,
)
)
def explode(self, columns: Sequence[str]) -> Self:
dtypes = self._version.dtypes
schema = self.collect_schema()
for col_to_explode in columns:
dtype = schema[col_to_explode]
if dtype != dtypes.List:
msg = (
f"`explode` operation not supported for dtype `{dtype}`, "
"expected List type"
)
raise InvalidOperationError(msg)
if len(columns) == 1:
return self._with_native(
self.native.explode(columns[0]), validate_column_names=False
)
else:
native_frame = self.native
anchor_series = native_frame[columns[0]].list.len()
if not all(
(native_frame[col_name].list.len() == anchor_series).all()
for col_name in columns[1:]
):
msg = "exploded columns must have matching element counts"
raise ShapeError(msg)
original_columns = self.columns
other_columns = [c for c in original_columns if c not in columns]
exploded_frame = native_frame[[*other_columns, columns[0]]].explode(
columns[0]
)
exploded_series = [
native_frame[col_name].explode().to_frame() for col_name in columns[1:]
]
plx = self.__native_namespace__()
return self._with_native(
plx.concat([exploded_frame, *exploded_series], axis=1)[original_columns],
validate_column_names=False,
)