450 lines
16 KiB
Python
450 lines
16 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import pytest
|
|
|
|
import pyarrow as pa
|
|
from pyarrow import fs
|
|
|
|
try:
|
|
import pyarrow.parquet as pq
|
|
from pyarrow.tests.parquet.common import (_read_table, _test_dataframe,
|
|
_range_integers)
|
|
except ImportError:
|
|
pq = None
|
|
|
|
|
|
try:
|
|
import pandas as pd
|
|
import pandas.testing as tm
|
|
|
|
except ImportError:
|
|
pd = tm = None
|
|
|
|
|
|
# Marks all of the tests in this module
|
|
# Ignore these with pytest ... -m 'not parquet'
|
|
pytestmark = pytest.mark.parquet
|
|
|
|
|
|
@pytest.mark.pandas
|
|
def test_parquet_incremental_file_build(tempdir):
|
|
df = _test_dataframe(100)
|
|
df['unique_id'] = 0
|
|
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
out = pa.BufferOutputStream()
|
|
|
|
writer = pq.ParquetWriter(out, arrow_table.schema, version='2.6')
|
|
|
|
frames = []
|
|
for i in range(10):
|
|
df['unique_id'] = i
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
writer.write_table(arrow_table)
|
|
|
|
frames.append(df.copy())
|
|
|
|
writer.close()
|
|
|
|
buf = out.getvalue()
|
|
result = _read_table(pa.BufferReader(buf))
|
|
|
|
expected = pd.concat(frames, ignore_index=True)
|
|
tm.assert_frame_equal(result.to_pandas(), expected)
|
|
|
|
|
|
def test_validate_schema_write_table(tempdir):
|
|
# ARROW-2926
|
|
simple_fields = [
|
|
pa.field('POS', pa.uint32()),
|
|
pa.field('desc', pa.string())
|
|
]
|
|
|
|
simple_schema = pa.schema(simple_fields)
|
|
|
|
# simple_table schema does not match simple_schema
|
|
simple_from_array = [pa.array([1]), pa.array(['bla'])]
|
|
simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc'])
|
|
|
|
path = tempdir / 'simple_validate_schema.parquet'
|
|
|
|
with pq.ParquetWriter(path, simple_schema,
|
|
version='2.6',
|
|
compression='snappy', flavor='spark') as w:
|
|
with pytest.raises(ValueError):
|
|
w.write_table(simple_table)
|
|
|
|
|
|
def test_parquet_invalid_writer(tempdir):
|
|
# avoid segfaults with invalid construction
|
|
with pytest.raises(TypeError):
|
|
some_schema = pa.schema([pa.field("x", pa.int32())])
|
|
pq.ParquetWriter(None, some_schema)
|
|
|
|
with pytest.raises(TypeError):
|
|
pq.ParquetWriter(tempdir / "some_path", None)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
def test_parquet_writer_context_obj(tempdir):
|
|
df = _test_dataframe(100)
|
|
df['unique_id'] = 0
|
|
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
out = pa.BufferOutputStream()
|
|
|
|
with pq.ParquetWriter(out, arrow_table.schema, version='2.6') as writer:
|
|
|
|
frames = []
|
|
for i in range(10):
|
|
df['unique_id'] = i
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
writer.write_table(arrow_table)
|
|
|
|
frames.append(df.copy())
|
|
|
|
buf = out.getvalue()
|
|
result = _read_table(pa.BufferReader(buf))
|
|
|
|
expected = pd.concat(frames, ignore_index=True)
|
|
tm.assert_frame_equal(result.to_pandas(), expected)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
def test_parquet_writer_context_obj_with_exception(tempdir):
|
|
df = _test_dataframe(100)
|
|
df['unique_id'] = 0
|
|
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
out = pa.BufferOutputStream()
|
|
error_text = 'Artificial Error'
|
|
|
|
try:
|
|
with pq.ParquetWriter(out,
|
|
arrow_table.schema,
|
|
version='2.6') as writer:
|
|
|
|
frames = []
|
|
for i in range(10):
|
|
df['unique_id'] = i
|
|
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
|
|
writer.write_table(arrow_table)
|
|
frames.append(df.copy())
|
|
if i == 5:
|
|
raise ValueError(error_text)
|
|
except Exception as e:
|
|
assert str(e) == error_text
|
|
|
|
buf = out.getvalue()
|
|
result = _read_table(pa.BufferReader(buf))
|
|
|
|
expected = pd.concat(frames, ignore_index=True)
|
|
tm.assert_frame_equal(result.to_pandas(), expected)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
@pytest.mark.parametrize("filesystem", [
|
|
None,
|
|
fs.LocalFileSystem(),
|
|
])
|
|
def test_parquet_writer_write_wrappers(tempdir, filesystem):
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
batch = pa.RecordBatch.from_pandas(df, preserve_index=False)
|
|
path_table = str(tempdir / 'data_table.parquet')
|
|
path_batch = str(tempdir / 'data_batch.parquet')
|
|
|
|
with pq.ParquetWriter(
|
|
path_table, table.schema, filesystem=filesystem, version='2.6'
|
|
) as writer:
|
|
writer.write_table(table)
|
|
|
|
result = _read_table(path_table).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
with pq.ParquetWriter(
|
|
path_batch, table.schema, filesystem=filesystem, version='2.6'
|
|
) as writer:
|
|
writer.write_batch(batch)
|
|
|
|
result = _read_table(path_batch).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
with pq.ParquetWriter(
|
|
path_table, table.schema, filesystem=filesystem, version='2.6'
|
|
) as writer:
|
|
writer.write(table)
|
|
|
|
result = _read_table(path_table).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
with pq.ParquetWriter(
|
|
path_batch, table.schema, filesystem=filesystem, version='2.6'
|
|
) as writer:
|
|
writer.write(batch)
|
|
|
|
result = _read_table(path_batch).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
|
|
@pytest.mark.large_memory
|
|
@pytest.mark.pandas
|
|
def test_parquet_writer_chunk_size(tempdir):
|
|
default_chunk_size = 1024 * 1024
|
|
abs_max_chunk_size = 64 * 1024 * 1024
|
|
|
|
def check_chunk_size(data_size, chunk_size, expect_num_chunks):
|
|
table = pa.Table.from_arrays([
|
|
_range_integers(data_size, 'b')
|
|
], names=['x'])
|
|
if chunk_size is None:
|
|
pq.write_table(table, tempdir / 'test.parquet')
|
|
else:
|
|
pq.write_table(table, tempdir / 'test.parquet', row_group_size=chunk_size)
|
|
metadata = pq.read_metadata(tempdir / 'test.parquet')
|
|
expected_chunk_size = default_chunk_size if chunk_size is None else chunk_size
|
|
assert metadata.num_row_groups == expect_num_chunks
|
|
latched_chunk_size = min(expected_chunk_size, abs_max_chunk_size)
|
|
# First chunks should be full size
|
|
for chunk_idx in range(expect_num_chunks - 1):
|
|
assert metadata.row_group(chunk_idx).num_rows == latched_chunk_size
|
|
# Last chunk may be smaller
|
|
remainder = data_size - (expected_chunk_size * (expect_num_chunks - 1))
|
|
if remainder == 0:
|
|
assert metadata.row_group(
|
|
expect_num_chunks - 1).num_rows == latched_chunk_size
|
|
else:
|
|
assert metadata.row_group(expect_num_chunks - 1).num_rows == remainder
|
|
|
|
check_chunk_size(default_chunk_size * 2, default_chunk_size - 100, 3)
|
|
check_chunk_size(default_chunk_size * 2, default_chunk_size, 2)
|
|
check_chunk_size(default_chunk_size * 2, default_chunk_size + 100, 2)
|
|
check_chunk_size(default_chunk_size + 100, default_chunk_size + 100, 1)
|
|
# Even though the chunk size requested is large enough it will be capped
|
|
# by the absolute max chunk size
|
|
check_chunk_size(abs_max_chunk_size * 2, abs_max_chunk_size * 2, 2)
|
|
|
|
# These tests don't pass a chunk_size to write_table and so the chunk size
|
|
# should be default_chunk_size
|
|
check_chunk_size(default_chunk_size, None, 1)
|
|
check_chunk_size(default_chunk_size + 1, None, 2)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
@pytest.mark.parametrize("filesystem", [
|
|
None,
|
|
fs.LocalFileSystem(),
|
|
])
|
|
def test_parquet_writer_filesystem_local(tempdir, filesystem):
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
path = str(tempdir / 'data.parquet')
|
|
|
|
with pq.ParquetWriter(
|
|
path, table.schema, filesystem=filesystem, version='2.6'
|
|
) as writer:
|
|
writer.write_table(table)
|
|
|
|
result = _read_table(path).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
@pytest.mark.s3
|
|
def test_parquet_writer_filesystem_s3(s3_example_fs):
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
|
|
fs, uri, path = s3_example_fs
|
|
|
|
with pq.ParquetWriter(
|
|
path, table.schema, filesystem=fs, version='2.6'
|
|
) as writer:
|
|
writer.write_table(table)
|
|
|
|
result = _read_table(uri).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
@pytest.mark.s3
|
|
def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
|
|
fs, uri, path = s3_example_fs
|
|
|
|
with pq.ParquetWriter(uri, table.schema, version='2.6') as writer:
|
|
writer.write_table(table)
|
|
|
|
result = _read_table(path, filesystem=fs).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
@pytest.mark.s3
|
|
def test_parquet_writer_filesystem_s3fs(s3_example_s3fs):
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
|
|
fs, directory = s3_example_s3fs
|
|
path = directory + "/test.parquet"
|
|
|
|
with pq.ParquetWriter(
|
|
path, table.schema, filesystem=fs, version='2.6'
|
|
) as writer:
|
|
writer.write_table(table)
|
|
|
|
result = _read_table(path, filesystem=fs).to_pandas()
|
|
tm.assert_frame_equal(result, df)
|
|
|
|
|
|
@pytest.mark.pandas
|
|
def test_parquet_writer_filesystem_buffer_raises():
|
|
df = _test_dataframe(100)
|
|
table = pa.Table.from_pandas(df, preserve_index=False)
|
|
filesystem = fs.LocalFileSystem()
|
|
|
|
# Should raise ValueError when filesystem is passed with file-like object
|
|
with pytest.raises(ValueError, match="specified path is file-like"):
|
|
pq.ParquetWriter(
|
|
pa.BufferOutputStream(), table.schema, filesystem=filesystem
|
|
)
|
|
|
|
|
|
def test_parquet_writer_store_schema(tempdir):
|
|
table = pa.table({'a': [1, 2, 3]})
|
|
|
|
# default -> write schema information
|
|
path1 = tempdir / 'test_with_schema.parquet'
|
|
with pq.ParquetWriter(path1, table.schema) as writer:
|
|
writer.write_table(table)
|
|
|
|
meta = pq.read_metadata(path1)
|
|
assert b'ARROW:schema' in meta.metadata
|
|
assert meta.metadata[b'ARROW:schema']
|
|
|
|
# disable adding schema information
|
|
path2 = tempdir / 'test_without_schema.parquet'
|
|
with pq.ParquetWriter(path2, table.schema, store_schema=False) as writer:
|
|
writer.write_table(table)
|
|
|
|
meta = pq.read_metadata(path2)
|
|
assert meta.metadata is None
|
|
|
|
|
|
def test_parquet_writer_append_key_value_metadata(tempdir):
|
|
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
|
|
path = tempdir / 'metadata.parquet'
|
|
|
|
with pq.ParquetWriter(path, table.schema) as writer:
|
|
writer.write_table(table)
|
|
writer.add_key_value_metadata({'key1': '1', 'key2': 'x'})
|
|
writer.add_key_value_metadata({'key2': '2', 'key3': '3'})
|
|
reader = pq.ParquetFile(path)
|
|
metadata = reader.metadata.metadata
|
|
assert metadata[b'key1'] == b'1'
|
|
assert metadata[b'key2'] == b'2'
|
|
assert metadata[b'key3'] == b'3'
|
|
|
|
|
|
def test_parquet_content_defined_chunking(tempdir):
|
|
table = pa.table({'a': range(100_000)})
|
|
|
|
# use PLAIN encoding because we compare the overall size of the row groups
|
|
# which would vary depending on the encoding making the assertions wrong
|
|
pq.write_table(table, tempdir / 'unchunked.parquet',
|
|
use_dictionary=False,
|
|
column_encoding="PLAIN")
|
|
pq.write_table(table, tempdir / 'chunked-default.parquet',
|
|
use_dictionary=False,
|
|
column_encoding="PLAIN",
|
|
use_content_defined_chunking=True)
|
|
pq.write_table(table, tempdir / 'chunked-custom.parquet',
|
|
use_dictionary=False,
|
|
column_encoding="PLAIN",
|
|
use_content_defined_chunking={"min_chunk_size": 32_768,
|
|
"max_chunk_size": 65_536})
|
|
|
|
# the data must be the same
|
|
unchunked = pq.read_table(tempdir / 'unchunked.parquet')
|
|
chunked_default = pq.read_table(tempdir / 'chunked-default.parquet')
|
|
chunked_custom = pq.read_table(tempdir / 'chunked-custom.parquet')
|
|
assert unchunked.equals(chunked_default)
|
|
assert unchunked.equals(chunked_custom)
|
|
|
|
# number of row groups and their sizes are not affected by content defined chunking
|
|
unchunked_metadata = pq.read_metadata(tempdir / 'unchunked.parquet')
|
|
chunked_default_metadata = pq.read_metadata(tempdir / 'chunked-default.parquet')
|
|
chunked_custom_metadata = pq.read_metadata(tempdir / 'chunked-custom.parquet')
|
|
|
|
assert unchunked_metadata.num_row_groups == chunked_default_metadata.num_row_groups
|
|
assert unchunked_metadata.num_row_groups == chunked_custom_metadata.num_row_groups
|
|
|
|
for i in range(unchunked_metadata.num_row_groups):
|
|
rg_unchunked = unchunked_metadata.row_group(i)
|
|
rg_chunked_default = chunked_default_metadata.row_group(i)
|
|
rg_chunked_custom = chunked_custom_metadata.row_group(i)
|
|
assert rg_unchunked.num_rows == rg_chunked_default.num_rows
|
|
assert rg_unchunked.num_rows == rg_chunked_custom.num_rows
|
|
# since PageReader is not exposed we cannot inspect the page sizes
|
|
# so just check that the total byte size is different
|
|
assert rg_unchunked.total_byte_size < rg_chunked_default.total_byte_size
|
|
assert rg_unchunked.total_byte_size < rg_chunked_custom.total_byte_size
|
|
assert rg_chunked_default.total_byte_size < rg_chunked_custom.total_byte_size
|
|
|
|
|
|
def test_parquet_content_defined_chunking_parameters(tempdir):
|
|
table = pa.table({'a': range(100)})
|
|
path = tempdir / 'chunked-invalid.parquet'
|
|
|
|
# it raises OSError, not ideal but this is how parquet exceptions are handled
|
|
# currently
|
|
msg = "max_chunk_size must be greater than min_chunk_size"
|
|
with pytest.raises(Exception, match=msg):
|
|
cdc_options = {"min_chunk_size": 65_536, "max_chunk_size": 32_768}
|
|
pq.write_table(table, path, use_content_defined_chunking=cdc_options)
|
|
|
|
cases = [
|
|
(
|
|
{"min_chunk_size": 64 * 1024, "unknown_option": True},
|
|
"Unknown options in 'use_content_defined_chunking': {'unknown_option'}"
|
|
),
|
|
(
|
|
{"min_chunk_size": 64 * 1024},
|
|
"Missing options in 'use_content_defined_chunking': {'max_chunk_size'}"
|
|
),
|
|
(
|
|
{"max_chunk_size": 64 * 1024},
|
|
"Missing options in 'use_content_defined_chunking': {'min_chunk_size'}"
|
|
)
|
|
]
|
|
for cdc_options, msg in cases:
|
|
with pytest.raises(ValueError, match=msg):
|
|
pq.write_table(table, path, use_content_defined_chunking=cdc_options)
|
|
|
|
# using the default parametrization
|
|
pq.write_table(table, path, use_content_defined_chunking=True)
|
|
|
|
# using min_chunk_size and max_chunk_size
|
|
cdc_options = {"min_chunk_size": 32_768, "max_chunk_size": 65_536}
|
|
pq.write_table(table, path, use_content_defined_chunking=cdc_options)
|
|
|
|
# using min_chunk_size, max_chunk_size and norm_level
|
|
cdc_options = {"min_chunk_size": 32_768, "max_chunk_size": 65_536, "norm_level": 1}
|
|
pq.write_table(table, path, use_content_defined_chunking=cdc_options)
|