817 lines
27 KiB
Python
817 lines
27 KiB
Python
![]() |
# Licensed to the Apache Software Foundation (ASF) under one
|
||
|
# or more contributor license agreements. See the NOTICE file
|
||
|
# distributed with this work for additional information
|
||
|
# regarding copyright ownership. The ASF licenses this file
|
||
|
# to you under the Apache License, Version 2.0 (the
|
||
|
# "License"); you may not use this file except in compliance
|
||
|
# with the License. You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing,
|
||
|
# software distributed under the License is distributed on an
|
||
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
# KIND, either express or implied. See the License for the
|
||
|
# specific language governing permissions and limitations
|
||
|
# under the License.
|
||
|
|
||
|
import datetime
|
||
|
import decimal
|
||
|
from collections import OrderedDict
|
||
|
import io
|
||
|
|
||
|
try:
|
||
|
import numpy as np
|
||
|
except ImportError:
|
||
|
np = None
|
||
|
import pytest
|
||
|
|
||
|
import pyarrow as pa
|
||
|
from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file
|
||
|
from pyarrow.fs import LocalFileSystem
|
||
|
from pyarrow.tests import util
|
||
|
|
||
|
try:
|
||
|
import pyarrow.parquet as pq
|
||
|
from pyarrow.tests.parquet.common import _write_table
|
||
|
except ImportError:
|
||
|
pq = None
|
||
|
|
||
|
|
||
|
try:
|
||
|
import pandas as pd
|
||
|
import pandas.testing as tm
|
||
|
|
||
|
from pyarrow.tests.parquet.common import alltypes_sample
|
||
|
except ImportError:
|
||
|
pd = tm = None
|
||
|
|
||
|
|
||
|
# Marks all of the tests in this module
|
||
|
# Ignore these with pytest ... -m 'not parquet'
|
||
|
pytestmark = pytest.mark.parquet
|
||
|
|
||
|
|
||
|
@pytest.mark.pandas
|
||
|
def test_parquet_metadata_api():
|
||
|
df = alltypes_sample(size=10000)
|
||
|
df = df.reindex(columns=sorted(df.columns))
|
||
|
df.index = np.random.randint(0, 1000000, size=len(df))
|
||
|
|
||
|
fileh = make_sample_file(df)
|
||
|
ncols = len(df.columns)
|
||
|
|
||
|
# Series of sniff tests
|
||
|
meta = fileh.metadata
|
||
|
repr(meta)
|
||
|
assert meta.num_rows == len(df)
|
||
|
assert meta.num_columns == ncols + 1 # +1 for index
|
||
|
assert meta.num_row_groups == 1
|
||
|
assert meta.format_version == '2.6'
|
||
|
assert 'parquet-cpp' in meta.created_by
|
||
|
assert isinstance(meta.serialized_size, int)
|
||
|
assert isinstance(meta.metadata, dict)
|
||
|
|
||
|
# Schema
|
||
|
schema = fileh.schema
|
||
|
assert meta.schema is schema
|
||
|
assert len(schema) == ncols + 1 # +1 for index
|
||
|
repr(schema)
|
||
|
|
||
|
col = schema[0]
|
||
|
repr(col)
|
||
|
assert col.name == df.columns[0]
|
||
|
assert col.max_definition_level == 1
|
||
|
assert col.max_repetition_level == 0
|
||
|
assert col.max_repetition_level == 0
|
||
|
assert col.physical_type == 'BOOLEAN'
|
||
|
assert col.converted_type == 'NONE'
|
||
|
|
||
|
col_float16 = schema[5]
|
||
|
assert col_float16.logical_type.type == 'FLOAT16'
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
schema[ncols + 1] # +1 for index
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
schema[-1]
|
||
|
|
||
|
# Row group
|
||
|
for rg in range(meta.num_row_groups):
|
||
|
rg_meta = meta.row_group(rg)
|
||
|
assert isinstance(rg_meta, pq.RowGroupMetaData)
|
||
|
repr(rg_meta)
|
||
|
|
||
|
for col in range(rg_meta.num_columns):
|
||
|
col_meta = rg_meta.column(col)
|
||
|
assert isinstance(col_meta, pq.ColumnChunkMetaData)
|
||
|
repr(col_meta)
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
meta.row_group(-1)
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
meta.row_group(meta.num_row_groups + 1)
|
||
|
|
||
|
rg_meta = meta.row_group(0)
|
||
|
assert rg_meta.num_rows == len(df)
|
||
|
assert rg_meta.num_columns == ncols + 1 # +1 for index
|
||
|
assert rg_meta.total_byte_size > 0
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
col_meta = rg_meta.column(-1)
|
||
|
|
||
|
with pytest.raises(IndexError):
|
||
|
col_meta = rg_meta.column(ncols + 2)
|
||
|
|
||
|
col_meta = rg_meta.column(0)
|
||
|
assert col_meta.file_offset == 0
|
||
|
assert col_meta.file_path == '' # created from BytesIO
|
||
|
assert col_meta.physical_type == 'BOOLEAN'
|
||
|
assert col_meta.num_values == 10000
|
||
|
assert col_meta.path_in_schema == 'bool'
|
||
|
assert col_meta.is_stats_set is True
|
||
|
assert isinstance(col_meta.statistics, pq.Statistics)
|
||
|
assert col_meta.compression == 'SNAPPY'
|
||
|
assert set(col_meta.encodings) == {'PLAIN', 'RLE'}
|
||
|
assert col_meta.has_dictionary_page is False
|
||
|
assert col_meta.dictionary_page_offset is None
|
||
|
assert col_meta.data_page_offset > 0
|
||
|
assert col_meta.total_compressed_size > 0
|
||
|
assert col_meta.total_uncompressed_size > 0
|
||
|
with pytest.raises(NotImplementedError):
|
||
|
col_meta.has_index_page
|
||
|
with pytest.raises(NotImplementedError):
|
||
|
col_meta.index_page_offset
|
||
|
|
||
|
|
||
|
def test_parquet_metadata_lifetime(tempdir):
|
||
|
# ARROW-6642 - ensure that chained access keeps parent objects alive
|
||
|
table = pa.table({'a': [1, 2, 3]})
|
||
|
pq.write_table(table, tempdir / 'test_metadata_segfault.parquet')
|
||
|
parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet')
|
||
|
parquet_file.metadata.row_group(0).column(0).statistics
|
||
|
|
||
|
|
||
|
@pytest.mark.pandas
|
||
|
@pytest.mark.parametrize(
|
||
|
(
|
||
|
'data',
|
||
|
'type',
|
||
|
'physical_type',
|
||
|
'min_value',
|
||
|
'max_value',
|
||
|
'null_count',
|
||
|
'num_values',
|
||
|
'distinct_count'
|
||
|
),
|
||
|
[
|
||
|
([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, None),
|
||
|
([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, None),
|
||
|
([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, None),
|
||
|
([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, None),
|
||
|
([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, None),
|
||
|
([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, None),
|
||
|
([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, None),
|
||
|
([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, None),
|
||
|
(
|
||
|
[-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
|
||
|
'FLOAT', -1.1, 4.4, 1, 4, None
|
||
|
),
|
||
|
(
|
||
|
[-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
|
||
|
'DOUBLE', -1.1, 4.4, 1, 4, None
|
||
|
),
|
||
|
(
|
||
|
['', 'b', chr(1000), None, 'aaa'], pa.binary(),
|
||
|
'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, None
|
||
|
),
|
||
|
(
|
||
|
[True, False, False, True, True], pa.bool_(),
|
||
|
'BOOLEAN', False, True, 0, 5, None
|
||
|
),
|
||
|
(
|
||
|
[b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
|
||
|
'BYTE_ARRAY', b'\x00', b'b', 1, 4, None
|
||
|
),
|
||
|
]
|
||
|
)
|
||
|
def test_parquet_column_statistics_api(data, type, physical_type, min_value,
|
||
|
max_value, null_count, num_values,
|
||
|
distinct_count):
|
||
|
df = pd.DataFrame({'data': data})
|
||
|
schema = pa.schema([pa.field('data', type)])
|
||
|
table = pa.Table.from_pandas(df, schema=schema, safe=False)
|
||
|
fileh = make_sample_file(table)
|
||
|
|
||
|
meta = fileh.metadata
|
||
|
|
||
|
rg_meta = meta.row_group(0)
|
||
|
col_meta = rg_meta.column(0)
|
||
|
|
||
|
stat = col_meta.statistics
|
||
|
assert stat.has_min_max
|
||
|
assert _close(type, stat.min, min_value)
|
||
|
assert _close(type, stat.max, max_value)
|
||
|
assert stat.null_count == null_count
|
||
|
assert stat.num_values == num_values
|
||
|
# TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
|
||
|
# method, missing distinct_count is represented as zero instead of None
|
||
|
assert stat.distinct_count == distinct_count
|
||
|
assert stat.physical_type == physical_type
|
||
|
|
||
|
|
||
|
def _close(type, left, right):
|
||
|
if type == pa.float32():
|
||
|
return abs(left - right) < 1E-7
|
||
|
elif type == pa.float64():
|
||
|
return abs(left - right) < 1E-13
|
||
|
else:
|
||
|
return left == right
|
||
|
|
||
|
|
||
|
# ARROW-6339
|
||
|
@pytest.mark.pandas
|
||
|
def test_parquet_raise_on_unset_statistics():
|
||
|
df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")})
|
||
|
meta = make_sample_file(pa.Table.from_pandas(df)).metadata
|
||
|
|
||
|
assert not meta.row_group(0).column(0).statistics.has_min_max
|
||
|
assert meta.row_group(0).column(0).statistics.max is None
|
||
|
|
||
|
|
||
|
def test_statistics_convert_logical_types(tempdir):
|
||
|
# ARROW-5166, ARROW-4139
|
||
|
|
||
|
# (min, max, type)
|
||
|
cases = [(10, 11164359321221007157, pa.uint64()),
|
||
|
(10, 4294967295, pa.uint32()),
|
||
|
("ähnlich", "öffentlich", pa.utf8()),
|
||
|
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
|
||
|
pa.time32('ms')),
|
||
|
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
|
||
|
pa.time64('us')),
|
||
|
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
|
||
|
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
|
||
|
pa.timestamp('ms')),
|
||
|
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
|
||
|
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
|
||
|
pa.timestamp('us')),
|
||
|
(datetime.date(2019, 6, 24),
|
||
|
datetime.date(2019, 6, 25),
|
||
|
pa.date32()),
|
||
|
(decimal.Decimal("20.123"),
|
||
|
decimal.Decimal("20.124"),
|
||
|
pa.decimal128(12, 5))]
|
||
|
|
||
|
for i, (min_val, max_val, typ) in enumerate(cases):
|
||
|
t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
|
||
|
['col'])
|
||
|
path = str(tempdir / f'example{i}.parquet')
|
||
|
pq.write_table(t, path, version='2.6')
|
||
|
pf = pq.ParquetFile(path)
|
||
|
stats = pf.metadata.row_group(0).column(0).statistics
|
||
|
assert stats.min == min_val
|
||
|
assert stats.max == max_val
|
||
|
|
||
|
|
||
|
def test_parquet_write_disable_statistics(tempdir):
|
||
|
table = pa.Table.from_pydict(
|
||
|
OrderedDict([
|
||
|
('a', pa.array([1, 2, 3])),
|
||
|
('b', pa.array(['a', 'b', 'c']))
|
||
|
])
|
||
|
)
|
||
|
_write_table(table, tempdir / 'data.parquet')
|
||
|
meta = pq.read_metadata(tempdir / 'data.parquet')
|
||
|
for col in [0, 1]:
|
||
|
cc = meta.row_group(0).column(col)
|
||
|
assert cc.is_stats_set is True
|
||
|
assert cc.statistics is not None
|
||
|
|
||
|
_write_table(table, tempdir / 'data2.parquet', write_statistics=False)
|
||
|
meta = pq.read_metadata(tempdir / 'data2.parquet')
|
||
|
for col in [0, 1]:
|
||
|
cc = meta.row_group(0).column(col)
|
||
|
assert cc.is_stats_set is False
|
||
|
assert cc.statistics is None
|
||
|
|
||
|
_write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
|
||
|
meta = pq.read_metadata(tempdir / 'data3.parquet')
|
||
|
cc_a = meta.row_group(0).column(0)
|
||
|
cc_b = meta.row_group(0).column(1)
|
||
|
assert cc_a.is_stats_set is True
|
||
|
assert cc_b.is_stats_set is False
|
||
|
assert cc_a.statistics is not None
|
||
|
assert cc_b.statistics is None
|
||
|
|
||
|
|
||
|
def test_parquet_sorting_column():
|
||
|
sorting_col = pq.SortingColumn(10)
|
||
|
assert sorting_col.to_dict() == {
|
||
|
'column_index': 10,
|
||
|
'descending': False,
|
||
|
'nulls_first': False
|
||
|
}
|
||
|
|
||
|
sorting_col = pq.SortingColumn(0, descending=True, nulls_first=True)
|
||
|
assert sorting_col.to_dict() == {
|
||
|
'column_index': 0,
|
||
|
'descending': True,
|
||
|
'nulls_first': True
|
||
|
}
|
||
|
|
||
|
schema = pa.schema([('a', pa.int64()), ('b', pa.int64())])
|
||
|
sorting_cols = (
|
||
|
pq.SortingColumn(1, descending=True),
|
||
|
pq.SortingColumn(0, descending=False),
|
||
|
)
|
||
|
sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_cols)
|
||
|
assert sort_order == (('b', "descending"), ('a', "ascending"))
|
||
|
assert null_placement == "at_end"
|
||
|
|
||
|
sorting_cols_roundtripped = pq.SortingColumn.from_ordering(
|
||
|
schema, sort_order, null_placement)
|
||
|
assert sorting_cols_roundtripped == sorting_cols
|
||
|
|
||
|
sorting_cols = pq.SortingColumn.from_ordering(
|
||
|
schema, ('a', ('b', "descending")), null_placement="at_start")
|
||
|
expected = (
|
||
|
pq.SortingColumn(0, descending=False, nulls_first=True),
|
||
|
pq.SortingColumn(1, descending=True, nulls_first=True),
|
||
|
)
|
||
|
assert sorting_cols == expected
|
||
|
|
||
|
# Conversions handle empty tuples
|
||
|
empty_sorting_cols = pq.SortingColumn.from_ordering(schema, ())
|
||
|
assert empty_sorting_cols == ()
|
||
|
|
||
|
assert pq.SortingColumn.to_ordering(schema, ()) == ((), "at_end")
|
||
|
|
||
|
with pytest.raises(ValueError):
|
||
|
pq.SortingColumn.from_ordering(schema, (("a", "not a valid sort order")))
|
||
|
|
||
|
with pytest.raises(ValueError, match="inconsistent null placement"):
|
||
|
sorting_cols = (
|
||
|
pq.SortingColumn(1, nulls_first=True),
|
||
|
pq.SortingColumn(0, nulls_first=False),
|
||
|
)
|
||
|
pq.SortingColumn.to_ordering(schema, sorting_cols)
|
||
|
|
||
|
|
||
|
def test_parquet_sorting_column_nested():
|
||
|
schema = pa.schema({
|
||
|
'a': pa.struct([('x', pa.int64()), ('y', pa.int64())]),
|
||
|
'b': pa.int64()
|
||
|
})
|
||
|
|
||
|
sorting_columns = [
|
||
|
pq.SortingColumn(0, descending=True), # a.x
|
||
|
pq.SortingColumn(2, descending=False) # b
|
||
|
]
|
||
|
|
||
|
sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_columns)
|
||
|
assert null_placement == "at_end"
|
||
|
assert len(sort_order) == 2
|
||
|
assert sort_order[0] == ("a.x", "descending")
|
||
|
assert sort_order[1] == ("b", "ascending")
|
||
|
|
||
|
|
||
|
def test_parquet_file_sorting_columns():
|
||
|
table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']})
|
||
|
|
||
|
sorting_columns = (
|
||
|
pq.SortingColumn(column_index=0, descending=True, nulls_first=True),
|
||
|
pq.SortingColumn(column_index=1, descending=False),
|
||
|
)
|
||
|
writer = pa.BufferOutputStream()
|
||
|
_write_table(table, writer, sorting_columns=sorting_columns)
|
||
|
reader = pa.BufferReader(writer.getvalue())
|
||
|
|
||
|
# Can retrieve sorting columns from metadata
|
||
|
metadata = pq.read_metadata(reader)
|
||
|
assert sorting_columns == metadata.row_group(0).sorting_columns
|
||
|
|
||
|
metadata_dict = metadata.to_dict()
|
||
|
assert metadata_dict.get('num_columns') == 2
|
||
|
assert metadata_dict.get('num_rows') == 3
|
||
|
assert metadata_dict.get('num_row_groups') == 1
|
||
|
|
||
|
|
||
|
def test_field_id_metadata():
|
||
|
# ARROW-7080
|
||
|
field_id = b'PARQUET:field_id'
|
||
|
inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'})
|
||
|
middle = pa.field('middle', pa.struct(
|
||
|
[inner]), metadata={field_id: b'101'})
|
||
|
fields = [
|
||
|
pa.field('basic', pa.int32(), metadata={
|
||
|
b'other': b'abc', field_id: b'1'}),
|
||
|
pa.field(
|
||
|
'list',
|
||
|
pa.list_(pa.field('list-inner', pa.int32(),
|
||
|
metadata={field_id: b'10'})),
|
||
|
metadata={field_id: b'11'}),
|
||
|
pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}),
|
||
|
pa.field('no-metadata', pa.int32()),
|
||
|
pa.field('non-integral-field-id', pa.int32(),
|
||
|
metadata={field_id: b'xyz'}),
|
||
|
pa.field('negative-field-id', pa.int32(),
|
||
|
metadata={field_id: b'-1000'})
|
||
|
]
|
||
|
arrs = [[] for _ in fields]
|
||
|
table = pa.table(arrs, schema=pa.schema(fields))
|
||
|
|
||
|
bio = pa.BufferOutputStream()
|
||
|
pq.write_table(table, bio)
|
||
|
contents = bio.getvalue()
|
||
|
|
||
|
pf = pq.ParquetFile(pa.BufferReader(contents))
|
||
|
schema = pf.schema_arrow
|
||
|
|
||
|
assert schema[0].metadata[field_id] == b'1'
|
||
|
assert schema[0].metadata[b'other'] == b'abc'
|
||
|
|
||
|
list_field = schema[1]
|
||
|
assert list_field.metadata[field_id] == b'11'
|
||
|
|
||
|
list_item_field = list_field.type.value_field
|
||
|
assert list_item_field.metadata[field_id] == b'10'
|
||
|
|
||
|
struct_field = schema[2]
|
||
|
assert struct_field.metadata[field_id] == b'102'
|
||
|
|
||
|
struct_middle_field = struct_field.type[0]
|
||
|
assert struct_middle_field.metadata[field_id] == b'101'
|
||
|
|
||
|
struct_inner_field = struct_middle_field.type[0]
|
||
|
assert struct_inner_field.metadata[field_id] == b'100'
|
||
|
|
||
|
assert schema[3].metadata is None
|
||
|
# Invalid input is passed through (ok) but does not
|
||
|
# have field_id in parquet (not tested)
|
||
|
assert schema[4].metadata[field_id] == b'xyz'
|
||
|
assert schema[5].metadata[field_id] == b'-1000'
|
||
|
|
||
|
|
||
|
def test_parquet_file_page_index():
|
||
|
for write_page_index in (False, True):
|
||
|
table = pa.table({'a': [1, 2, 3]})
|
||
|
|
||
|
writer = pa.BufferOutputStream()
|
||
|
_write_table(table, writer, write_page_index=write_page_index)
|
||
|
reader = pa.BufferReader(writer.getvalue())
|
||
|
|
||
|
# Can retrieve sorting columns from metadata
|
||
|
metadata = pq.read_metadata(reader)
|
||
|
cc = metadata.row_group(0).column(0)
|
||
|
assert cc.has_offset_index is write_page_index
|
||
|
assert cc.has_column_index is write_page_index
|
||
|
|
||
|
|
||
|
@pytest.mark.pandas
|
||
|
def test_multi_dataset_metadata(tempdir):
|
||
|
filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
|
||
|
metapath = str(tempdir / "_metadata")
|
||
|
|
||
|
# create a test dataset
|
||
|
df = pd.DataFrame({
|
||
|
'one': [1, 2, 3],
|
||
|
'two': [-1, -2, -3],
|
||
|
'three': [[1, 2], [2, 3], [3, 4]],
|
||
|
})
|
||
|
table = pa.Table.from_pandas(df)
|
||
|
|
||
|
# write dataset twice and collect/merge metadata
|
||
|
_meta = None
|
||
|
for filename in filenames:
|
||
|
meta = []
|
||
|
pq.write_table(table, str(tempdir / filename),
|
||
|
metadata_collector=meta)
|
||
|
meta[0].set_file_path(filename)
|
||
|
if _meta is None:
|
||
|
_meta = meta[0]
|
||
|
else:
|
||
|
_meta.append_row_groups(meta[0])
|
||
|
|
||
|
# Write merged metadata-only file
|
||
|
with open(metapath, "wb") as f:
|
||
|
_meta.write_metadata_file(f)
|
||
|
|
||
|
# Read back the metadata
|
||
|
meta = pq.read_metadata(metapath)
|
||
|
md = meta.to_dict()
|
||
|
_md = _meta.to_dict()
|
||
|
for key in _md:
|
||
|
if key != 'serialized_size':
|
||
|
assert _md[key] == md[key]
|
||
|
assert _md['num_columns'] == 3
|
||
|
assert _md['num_rows'] == 6
|
||
|
assert _md['num_row_groups'] == 2
|
||
|
assert _md['serialized_size'] == 0
|
||
|
assert md['serialized_size'] > 0
|
||
|
|
||
|
|
||
|
def test_metadata_hashing(tempdir):
|
||
|
path1 = str(tempdir / "metadata1")
|
||
|
schema1 = pa.schema([("a", "int64"), ("b", "float64")])
|
||
|
pq.write_metadata(schema1, path1)
|
||
|
parquet_meta1 = pq.read_metadata(path1)
|
||
|
|
||
|
# Same as 1, just different path
|
||
|
path2 = str(tempdir / "metadata2")
|
||
|
schema2 = pa.schema([("a", "int64"), ("b", "float64")])
|
||
|
pq.write_metadata(schema2, path2)
|
||
|
parquet_meta2 = pq.read_metadata(path2)
|
||
|
|
||
|
# different schema
|
||
|
path3 = str(tempdir / "metadata3")
|
||
|
schema3 = pa.schema([("a", "int64"), ("b", "float32")])
|
||
|
pq.write_metadata(schema3, path3)
|
||
|
parquet_meta3 = pq.read_metadata(path3)
|
||
|
|
||
|
# Deterministic
|
||
|
assert hash(parquet_meta1) == hash(parquet_meta1) # equal w/ same instance
|
||
|
assert hash(parquet_meta1) == hash(parquet_meta2) # equal w/ different instance
|
||
|
|
||
|
# Not the same as other metadata with different schema
|
||
|
assert hash(parquet_meta1) != hash(parquet_meta3)
|
||
|
|
||
|
|
||
|
@pytest.mark.filterwarnings("ignore:Parquet format:FutureWarning")
|
||
|
def test_write_metadata(tempdir):
|
||
|
path = str(tempdir / "metadata")
|
||
|
schema = pa.schema([("a", "int64"), ("b", "float64")])
|
||
|
|
||
|
# write a pyarrow schema
|
||
|
pq.write_metadata(schema, path)
|
||
|
parquet_meta = pq.read_metadata(path)
|
||
|
schema_as_arrow = parquet_meta.schema.to_arrow_schema()
|
||
|
assert schema_as_arrow.equals(schema)
|
||
|
|
||
|
# ARROW-8980: Check that the ARROW:schema metadata key was removed
|
||
|
if schema_as_arrow.metadata:
|
||
|
assert b'ARROW:schema' not in schema_as_arrow.metadata
|
||
|
|
||
|
# pass through writer keyword arguments
|
||
|
for version in ["1.0", "2.4", "2.6"]:
|
||
|
pq.write_metadata(schema, path, version=version)
|
||
|
parquet_meta = pq.read_metadata(path)
|
||
|
# The version is stored as a single integer in the Parquet metadata,
|
||
|
# so it cannot correctly express dotted format versions
|
||
|
expected_version = "1.0" if version == "1.0" else "2.6"
|
||
|
assert parquet_meta.format_version == expected_version
|
||
|
|
||
|
# metadata_collector: list of FileMetaData objects
|
||
|
table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema)
|
||
|
pq.write_table(table, tempdir / "data.parquet")
|
||
|
parquet_meta = pq.read_metadata(str(tempdir / "data.parquet"))
|
||
|
pq.write_metadata(
|
||
|
schema, path, metadata_collector=[parquet_meta, parquet_meta]
|
||
|
)
|
||
|
parquet_meta_mult = pq.read_metadata(path)
|
||
|
assert parquet_meta_mult.num_row_groups == 2
|
||
|
|
||
|
# append metadata with different schema raises an error
|
||
|
msg = ("AppendRowGroups requires equal schemas.\n"
|
||
|
"The two columns with index 0 differ.")
|
||
|
with pytest.raises(RuntimeError, match=msg):
|
||
|
pq.write_metadata(
|
||
|
pa.schema([("a", "int32"), ("b", "null")]),
|
||
|
path, metadata_collector=[parquet_meta, parquet_meta]
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_table_large_metadata():
|
||
|
# ARROW-8694
|
||
|
my_schema = pa.schema([pa.field('f0', 'double')],
|
||
|
metadata={'large': 'x' * 10000000})
|
||
|
|
||
|
table = pa.table([range(10)], schema=my_schema)
|
||
|
_check_roundtrip(table)
|
||
|
|
||
|
|
||
|
@pytest.mark.pandas
|
||
|
def test_compare_schemas():
|
||
|
df = alltypes_sample(size=10000)
|
||
|
|
||
|
fileh = make_sample_file(df)
|
||
|
fileh2 = make_sample_file(df)
|
||
|
fileh3 = make_sample_file(df[df.columns[::2]])
|
||
|
|
||
|
# ParquetSchema
|
||
|
assert isinstance(fileh.schema, pq.ParquetSchema)
|
||
|
assert fileh.schema.equals(fileh.schema)
|
||
|
assert fileh.schema == fileh.schema
|
||
|
assert fileh.schema.equals(fileh2.schema)
|
||
|
assert fileh.schema == fileh2.schema
|
||
|
assert fileh.schema != 'arbitrary object'
|
||
|
assert not fileh.schema.equals(fileh3.schema)
|
||
|
assert fileh.schema != fileh3.schema
|
||
|
|
||
|
# ColumnSchema
|
||
|
assert isinstance(fileh.schema[0], pq.ColumnSchema)
|
||
|
assert fileh.schema[0].equals(fileh.schema[0])
|
||
|
assert fileh.schema[0] == fileh.schema[0]
|
||
|
assert not fileh.schema[0].equals(fileh.schema[1])
|
||
|
assert fileh.schema[0] != fileh.schema[1]
|
||
|
assert fileh.schema[0] != 'arbitrary object'
|
||
|
|
||
|
|
||
|
@pytest.mark.pandas
|
||
|
def test_read_schema(tempdir):
|
||
|
N = 100
|
||
|
df = pd.DataFrame({
|
||
|
'index': np.arange(N),
|
||
|
'values': np.random.randn(N)
|
||
|
}, columns=['index', 'values'])
|
||
|
|
||
|
data_path = tempdir / 'test.parquet'
|
||
|
|
||
|
table = pa.Table.from_pandas(df)
|
||
|
_write_table(table, data_path)
|
||
|
|
||
|
read1 = pq.read_schema(data_path)
|
||
|
read2 = pq.read_schema(data_path, memory_map=True)
|
||
|
assert table.schema.equals(read1)
|
||
|
assert table.schema.equals(read2)
|
||
|
|
||
|
assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas']
|
||
|
|
||
|
|
||
|
def test_parquet_metadata_empty_to_dict(tempdir):
|
||
|
# https://issues.apache.org/jira/browse/ARROW-10146
|
||
|
table = pa.table({"a": pa.array([], type="int64")})
|
||
|
pq.write_table(table, tempdir / "data.parquet")
|
||
|
metadata = pq.read_metadata(tempdir / "data.parquet")
|
||
|
# ensure this doesn't error / statistics set to None
|
||
|
metadata_dict = metadata.to_dict()
|
||
|
assert len(metadata_dict["row_groups"]) == 1
|
||
|
assert len(metadata_dict["row_groups"][0]["columns"]) == 1
|
||
|
assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None
|
||
|
|
||
|
|
||
|
@pytest.mark.slow
|
||
|
@pytest.mark.large_memory
|
||
|
def test_metadata_exceeds_message_size():
|
||
|
# ARROW-13655: Thrift may enable a default message size that limits
|
||
|
# the size of Parquet metadata that can be written.
|
||
|
NCOLS = 1000
|
||
|
NREPEATS = 4000
|
||
|
|
||
|
table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)})
|
||
|
|
||
|
with pa.BufferOutputStream() as out:
|
||
|
pq.write_table(table, out)
|
||
|
buf = out.getvalue()
|
||
|
|
||
|
original_metadata = pq.read_metadata(pa.BufferReader(buf))
|
||
|
metadata = pq.read_metadata(pa.BufferReader(buf))
|
||
|
for i in range(NREPEATS):
|
||
|
metadata.append_row_groups(original_metadata)
|
||
|
|
||
|
with pa.BufferOutputStream() as out:
|
||
|
metadata.write_metadata_file(out)
|
||
|
buf = out.getvalue()
|
||
|
|
||
|
metadata = pq.read_metadata(pa.BufferReader(buf))
|
||
|
|
||
|
|
||
|
def test_metadata_schema_filesystem(tempdir):
|
||
|
table = pa.table({"a": [1, 2, 3]})
|
||
|
|
||
|
# URI writing to local file.
|
||
|
fname = "data.parquet"
|
||
|
file_path = str(tempdir / fname)
|
||
|
file_uri = 'file:///' + file_path
|
||
|
|
||
|
pq.write_table(table, file_path)
|
||
|
|
||
|
# Get expected `metadata` from path.
|
||
|
metadata = pq.read_metadata(tempdir / fname)
|
||
|
schema = table.schema
|
||
|
|
||
|
assert pq.read_metadata(file_uri).equals(metadata)
|
||
|
assert pq.read_metadata(
|
||
|
file_path, filesystem=LocalFileSystem()).equals(metadata)
|
||
|
assert pq.read_metadata(
|
||
|
fname, filesystem=f'file:///{tempdir}').equals(metadata)
|
||
|
|
||
|
assert pq.read_schema(file_uri).equals(schema)
|
||
|
assert pq.read_schema(
|
||
|
file_path, filesystem=LocalFileSystem()).equals(schema)
|
||
|
assert pq.read_schema(
|
||
|
fname, filesystem=f'file:///{tempdir}').equals(schema)
|
||
|
|
||
|
with util.change_cwd(tempdir):
|
||
|
# Pass `filesystem` arg
|
||
|
assert pq.read_metadata(
|
||
|
fname, filesystem=LocalFileSystem()).equals(metadata)
|
||
|
|
||
|
assert pq.read_schema(
|
||
|
fname, filesystem=LocalFileSystem()).equals(schema)
|
||
|
|
||
|
|
||
|
def test_metadata_equals():
|
||
|
table = pa.table({"a": [1, 2, 3]})
|
||
|
with pa.BufferOutputStream() as out:
|
||
|
pq.write_table(table, out)
|
||
|
buf = out.getvalue()
|
||
|
|
||
|
original_metadata = pq.read_metadata(pa.BufferReader(buf))
|
||
|
match = "Argument 'other' has incorrect type"
|
||
|
with pytest.raises(TypeError, match=match):
|
||
|
original_metadata.equals(None)
|
||
|
|
||
|
|
||
|
@pytest.mark.parametrize("t1,t2,expected_error", (
|
||
|
({'col1': range(10)}, {'col1': range(10)}, None),
|
||
|
({'col1': range(10)}, {'col2': range(10)},
|
||
|
"The two columns with index 0 differ."),
|
||
|
({'col1': range(10), 'col2': range(10)}, {'col3': range(10)},
|
||
|
"This schema has 2 columns, other has 1")
|
||
|
))
|
||
|
def test_metadata_append_row_groups_diff(t1, t2, expected_error):
|
||
|
table1 = pa.table(t1)
|
||
|
table2 = pa.table(t2)
|
||
|
|
||
|
buf1 = io.BytesIO()
|
||
|
buf2 = io.BytesIO()
|
||
|
pq.write_table(table1, buf1)
|
||
|
pq.write_table(table2, buf2)
|
||
|
buf1.seek(0)
|
||
|
buf2.seek(0)
|
||
|
|
||
|
meta1 = pq.ParquetFile(buf1).metadata
|
||
|
meta2 = pq.ParquetFile(buf2).metadata
|
||
|
|
||
|
if expected_error:
|
||
|
# Error clearly defines it's happening at append row groups call
|
||
|
prefix = "AppendRowGroups requires equal schemas.\n"
|
||
|
with pytest.raises(RuntimeError, match=prefix + expected_error):
|
||
|
meta1.append_row_groups(meta2)
|
||
|
else:
|
||
|
meta1.append_row_groups(meta2)
|
||
|
|
||
|
|
||
|
@pytest.mark.s3
|
||
|
def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs):
|
||
|
s3_fs, s3_path = s3_example_s3fs
|
||
|
|
||
|
meta1 = tempdir / "meta1"
|
||
|
meta2 = tempdir / "meta2"
|
||
|
meta3 = tempdir / "meta3"
|
||
|
meta4 = tempdir / "meta4"
|
||
|
meta5 = f"{s3_path}/meta5"
|
||
|
|
||
|
table = pa.table({"col": range(5)})
|
||
|
|
||
|
# plain local path
|
||
|
pq.write_metadata(table.schema, meta1, [])
|
||
|
|
||
|
# Used the localfilesystem to resolve opening an output stream
|
||
|
pq.write_metadata(table.schema, meta2, [], filesystem=LocalFileSystem())
|
||
|
|
||
|
# Can resolve local file URI
|
||
|
pq.write_metadata(table.schema, meta3.as_uri(), [])
|
||
|
|
||
|
# Take a file-like obj all the way thru?
|
||
|
with meta4.open('wb+') as meta4_stream:
|
||
|
pq.write_metadata(table.schema, meta4_stream, [])
|
||
|
|
||
|
# S3FileSystem
|
||
|
pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs)
|
||
|
|
||
|
assert meta1.read_bytes() == meta2.read_bytes() \
|
||
|
== meta3.read_bytes() == meta4.read_bytes() \
|
||
|
== s3_fs.open(meta5).read()
|
||
|
|
||
|
|
||
|
def test_column_chunk_key_value_metadata(parquet_test_datadir):
|
||
|
metadata = pq.read_metadata(parquet_test_datadir /
|
||
|
'column_chunk_key_value_metadata.parquet')
|
||
|
key_value_metadata1 = metadata.row_group(0).column(0).metadata
|
||
|
assert key_value_metadata1 == {b'foo': b'bar', b'thisiskeywithoutvalue': b''}
|
||
|
key_value_metadata2 = metadata.row_group(0).column(1).metadata
|
||
|
assert key_value_metadata2 is None
|
||
|
|
||
|
|
||
|
def test_internal_class_instantiation():
|
||
|
def msg(c):
|
||
|
return f"Do not call {c}'s constructor directly"
|
||
|
|
||
|
with pytest.raises(TypeError, match=msg("Statistics")):
|
||
|
pq.Statistics()
|
||
|
|
||
|
with pytest.raises(TypeError, match=msg("ParquetLogicalType")):
|
||
|
pq.ParquetLogicalType()
|
||
|
|
||
|
with pytest.raises(TypeError, match=msg("ColumnChunkMetaData")):
|
||
|
pq.ColumnChunkMetaData()
|
||
|
|
||
|
with pytest.raises(TypeError, match=msg("RowGroupMetaData")):
|
||
|
pq.RowGroupMetaData()
|
||
|
|
||
|
with pytest.raises(TypeError, match=msg("FileMetaData")):
|
||
|
pq.FileMetaData()
|