620 lines
23 KiB
Python
620 lines
23 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
import pytest
|
|
from datetime import timedelta
|
|
|
|
import pyarrow as pa
|
|
try:
|
|
import pyarrow.parquet as pq
|
|
import pyarrow.parquet.encryption as pe
|
|
except ImportError:
|
|
pq = None
|
|
pe = None
|
|
else:
|
|
from pyarrow.tests.parquet.encryption import (
|
|
InMemoryKmsClient, verify_file_encrypted)
|
|
|
|
|
|
PARQUET_NAME = 'encrypted_table.in_mem.parquet'
|
|
FOOTER_KEY = b"0123456789112345"
|
|
FOOTER_KEY_NAME = "footer_key"
|
|
COL_KEY = b"1234567890123450"
|
|
COL_KEY_NAME = "col_key"
|
|
|
|
|
|
# Marks all of the tests in this module
|
|
# Ignore these with pytest ... -m 'not parquet_encryption'
|
|
# Ignore these with pytest ... -m 'not parquet'
|
|
pytestmark = [
|
|
pytest.mark.parquet_encryption,
|
|
pytest.mark.parquet
|
|
]
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def data_table():
|
|
data_table = pa.Table.from_pydict({
|
|
'a': pa.array([1, 2, 3]),
|
|
'b': pa.array(['a', 'b', 'c']),
|
|
'c': pa.array(['x', 'y', 'z'])
|
|
})
|
|
return data_table
|
|
|
|
|
|
@pytest.fixture(scope='module')
|
|
def basic_encryption_config():
|
|
basic_encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={
|
|
COL_KEY_NAME: ["a", "b"],
|
|
})
|
|
return basic_encryption_config
|
|
|
|
|
|
def setup_encryption_environment(custom_kms_conf):
|
|
"""
|
|
Sets up and returns the KMS connection configuration and crypto factory
|
|
based on provided KMS configuration parameters.
|
|
"""
|
|
kms_connection_config = pe.KmsConnectionConfig(custom_kms_conf=custom_kms_conf)
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
return InMemoryKmsClient(kms_connection_configuration)
|
|
|
|
# Create our CryptoFactory
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
|
|
return kms_connection_config, crypto_factory
|
|
|
|
|
|
def write_encrypted_file(path, data_table, footer_key_name, col_key_name,
|
|
footer_key, col_key, encryption_config):
|
|
"""
|
|
Writes an encrypted parquet file based on the provided parameters.
|
|
"""
|
|
# Setup the custom KMS configuration with provided keys
|
|
custom_kms_conf = {
|
|
footer_key_name: footer_key.decode("UTF-8"),
|
|
col_key_name: col_key.decode("UTF-8"),
|
|
}
|
|
|
|
# Setup encryption environment
|
|
kms_connection_config, crypto_factory = setup_encryption_environment(
|
|
custom_kms_conf)
|
|
|
|
# Write the encrypted parquet file
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
return kms_connection_config, crypto_factory
|
|
|
|
|
|
def test_encrypted_parquet_write_read(tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted, and then read it."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the footer with the footer key,
|
|
# encrypt column `a` and column `b` with another key,
|
|
# keep `c` plaintext
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={
|
|
COL_KEY_NAME: ["a", "b"],
|
|
},
|
|
encryption_algorithm="AES_GCM_V1",
|
|
cache_lifetime=timedelta(minutes=5.0),
|
|
data_key_length_bits=256)
|
|
assert encryption_config.uniform_encryption is False
|
|
|
|
kms_connection_config, crypto_factory = write_encrypted_file(
|
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
|
|
encryption_config)
|
|
|
|
verify_file_encrypted(path)
|
|
|
|
# Read with decryption properties
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
result_table = read_encrypted_parquet(
|
|
path, decryption_config, kms_connection_config, crypto_factory)
|
|
assert data_table.equals(result_table)
|
|
|
|
|
|
def test_uniform_encrypted_parquet_write_read(tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted, and then read it."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the footer and all columns with the footer key,
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
uniform_encryption=True,
|
|
encryption_algorithm="AES_GCM_V1",
|
|
cache_lifetime=timedelta(minutes=5.0),
|
|
data_key_length_bits=256)
|
|
assert encryption_config.uniform_encryption is True
|
|
|
|
kms_connection_config, crypto_factory = write_encrypted_file(
|
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, b"",
|
|
encryption_config)
|
|
|
|
verify_file_encrypted(path)
|
|
|
|
# Read with decryption properties
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
result_table = read_encrypted_parquet(
|
|
path, decryption_config, kms_connection_config, crypto_factory)
|
|
assert data_table.equals(result_table)
|
|
|
|
|
|
def write_encrypted_parquet(path, table, encryption_config,
|
|
kms_connection_config, crypto_factory):
|
|
file_encryption_properties = crypto_factory.file_encryption_properties(
|
|
kms_connection_config, encryption_config)
|
|
assert file_encryption_properties is not None
|
|
with pq.ParquetWriter(
|
|
path, table.schema,
|
|
encryption_properties=file_encryption_properties) as writer:
|
|
writer.write_table(table)
|
|
|
|
|
|
def read_encrypted_parquet(path, decryption_config,
|
|
kms_connection_config, crypto_factory):
|
|
file_decryption_properties = crypto_factory.file_decryption_properties(
|
|
kms_connection_config, decryption_config)
|
|
assert file_decryption_properties is not None
|
|
meta = pq.read_metadata(
|
|
path, decryption_properties=file_decryption_properties)
|
|
assert meta.num_columns == 3
|
|
schema = pq.read_schema(
|
|
path, decryption_properties=file_decryption_properties)
|
|
assert len(schema.names) == 3
|
|
|
|
result = pq.ParquetFile(
|
|
path, decryption_properties=file_decryption_properties)
|
|
return result.read(use_threads=True)
|
|
|
|
|
|
def test_encrypted_parquet_write_read_wrong_key(tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted,
|
|
and then read it using wrong keys."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the footer with the footer key,
|
|
# encrypt column `a` and column `b` with another key,
|
|
# keep `c` plaintext
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={
|
|
COL_KEY_NAME: ["a", "b"],
|
|
},
|
|
encryption_algorithm="AES_GCM_V1",
|
|
cache_lifetime=timedelta(minutes=5.0),
|
|
data_key_length_bits=256)
|
|
|
|
write_encrypted_file(path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME,
|
|
FOOTER_KEY, COL_KEY, encryption_config)
|
|
|
|
verify_file_encrypted(path)
|
|
|
|
wrong_kms_connection_config, wrong_crypto_factory = setup_encryption_environment({
|
|
FOOTER_KEY_NAME: COL_KEY.decode("UTF-8"), # Intentionally wrong
|
|
COL_KEY_NAME: FOOTER_KEY.decode("UTF-8"), # Intentionally wrong
|
|
})
|
|
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
with pytest.raises(ValueError, match=r"Incorrect master key used"):
|
|
read_encrypted_parquet(
|
|
path, decryption_config, wrong_kms_connection_config,
|
|
wrong_crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_read_no_decryption_config(tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted,
|
|
but then try to read it without decryption properties."""
|
|
test_encrypted_parquet_write_read(tempdir, data_table)
|
|
# Read without decryption properties
|
|
with pytest.raises(IOError, match=r"no decryption"):
|
|
pq.ParquetFile(tempdir / PARQUET_NAME).read()
|
|
|
|
|
|
def test_encrypted_parquet_read_metadata_no_decryption_config(
|
|
tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted,
|
|
but then try to read its metadata without decryption properties."""
|
|
test_encrypted_parquet_write_read(tempdir, data_table)
|
|
# Read metadata without decryption properties
|
|
with pytest.raises(IOError, match=r"no decryption"):
|
|
pq.read_metadata(tempdir / PARQUET_NAME)
|
|
|
|
|
|
def test_encrypted_parquet_read_schema_no_decryption_config(
|
|
tempdir, data_table):
|
|
"""Write an encrypted parquet, verify it's encrypted,
|
|
but then try to read its schema without decryption properties."""
|
|
test_encrypted_parquet_write_read(tempdir, data_table)
|
|
with pytest.raises(IOError, match=r"no decryption"):
|
|
pq.read_schema(tempdir / PARQUET_NAME)
|
|
|
|
|
|
def test_encrypted_parquet_write_no_col_key(tempdir, data_table):
|
|
"""Write an encrypted parquet, but give only footer key,
|
|
without column key."""
|
|
path = tempdir / 'encrypted_table_no_col_key.in_mem.parquet'
|
|
|
|
# Encrypt the footer with the footer key
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME)
|
|
|
|
with pytest.raises(OSError,
|
|
match="Either column_keys or uniform_encryption "
|
|
"must be set"):
|
|
# Write with encryption properties
|
|
write_encrypted_file(path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME,
|
|
FOOTER_KEY, b"", encryption_config)
|
|
|
|
|
|
def test_encrypted_parquet_write_col_key_and_uniform_encryption(tempdir, data_table):
|
|
"""Write an encrypted parquet, but give only footer key,
|
|
without column key."""
|
|
path = tempdir / 'encrypted_table_col_key_and_uniform_encryption.in_mem.parquet'
|
|
|
|
# Encrypt the footer with the footer key
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={
|
|
COL_KEY_NAME: ["a", "b"],
|
|
},
|
|
uniform_encryption=True)
|
|
|
|
with pytest.raises(OSError,
|
|
match=r"Cannot set both column_keys and uniform_encryption"):
|
|
# Write with encryption properties
|
|
write_encrypted_file(path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME,
|
|
FOOTER_KEY, b"", encryption_config)
|
|
|
|
|
|
def test_encrypted_parquet_write_kms_error(tempdir, data_table,
|
|
basic_encryption_config):
|
|
"""Write an encrypted parquet, but raise KeyError in KmsClient."""
|
|
path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
|
|
encryption_config = basic_encryption_config
|
|
|
|
# Empty master_keys_map
|
|
kms_connection_config = pe.KmsConnectionConfig()
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
# Empty master keys map will cause KeyError to be raised
|
|
# on wrap/unwrap calls
|
|
return InMemoryKmsClient(kms_connection_configuration)
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
with pytest.raises(KeyError, match="footer_key"):
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_write_kms_specific_error(tempdir, data_table,
|
|
basic_encryption_config):
|
|
"""Write an encrypted parquet, but raise KeyError in KmsClient."""
|
|
path = tempdir / 'encrypted_table_kms_error.in_mem.parquet'
|
|
encryption_config = basic_encryption_config
|
|
|
|
# Empty master_keys_map
|
|
kms_connection_config = pe.KmsConnectionConfig()
|
|
|
|
class ThrowingKmsClient(pe.KmsClient):
|
|
"""A KmsClient implementation that throws exception in
|
|
wrap/unwrap calls
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
"""Create an InMemoryKmsClient instance."""
|
|
pe.KmsClient.__init__(self)
|
|
self.config = config
|
|
|
|
def wrap_key(self, key_bytes, master_key_identifier):
|
|
raise ValueError("Cannot Wrap Key")
|
|
|
|
def unwrap_key(self, wrapped_key, master_key_identifier):
|
|
raise ValueError("Cannot Unwrap Key")
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
# Exception thrown in wrap/unwrap calls
|
|
return ThrowingKmsClient(kms_connection_configuration)
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
with pytest.raises(ValueError, match="Cannot Wrap Key"):
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_write_kms_factory_error(tempdir, data_table,
|
|
basic_encryption_config):
|
|
"""Write an encrypted parquet, but raise ValueError in kms_factory."""
|
|
path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet'
|
|
encryption_config = basic_encryption_config
|
|
|
|
# Empty master_keys_map
|
|
kms_connection_config = pe.KmsConnectionConfig()
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
raise ValueError('Cannot create KmsClient')
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
with pytest.raises(ValueError,
|
|
match="Cannot create KmsClient"):
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_write_kms_factory_type_error(
|
|
tempdir, data_table, basic_encryption_config):
|
|
"""Write an encrypted parquet, but use wrong KMS client type
|
|
that doesn't implement KmsClient."""
|
|
path = tempdir / 'encrypted_table_kms_factory_error.in_mem.parquet'
|
|
encryption_config = basic_encryption_config
|
|
|
|
# Empty master_keys_map
|
|
kms_connection_config = pe.KmsConnectionConfig()
|
|
|
|
class WrongTypeKmsClient():
|
|
"""This is not an implementation of KmsClient.
|
|
"""
|
|
|
|
def __init__(self, config):
|
|
self.master_keys_map = config.custom_kms_conf
|
|
|
|
def wrap_key(self, key_bytes, master_key_identifier):
|
|
return None
|
|
|
|
def unwrap_key(self, wrapped_key, master_key_identifier):
|
|
return None
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
return WrongTypeKmsClient(kms_connection_configuration)
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
with pytest.raises(TypeError):
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_encryption_configuration():
|
|
def validate_encryption_configuration(encryption_config):
|
|
assert FOOTER_KEY_NAME == encryption_config.footer_key
|
|
assert ["a", "b"] == encryption_config.column_keys[COL_KEY_NAME]
|
|
assert "AES_GCM_CTR_V1" == encryption_config.encryption_algorithm
|
|
assert encryption_config.plaintext_footer
|
|
assert not encryption_config.double_wrapping
|
|
assert timedelta(minutes=10.0) == encryption_config.cache_lifetime
|
|
assert not encryption_config.internal_key_material
|
|
assert 192 == encryption_config.data_key_length_bits
|
|
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={COL_KEY_NAME: ["a", "b"], },
|
|
encryption_algorithm="AES_GCM_CTR_V1",
|
|
plaintext_footer=True,
|
|
double_wrapping=False,
|
|
cache_lifetime=timedelta(minutes=10.0),
|
|
internal_key_material=False,
|
|
data_key_length_bits=192,
|
|
)
|
|
validate_encryption_configuration(encryption_config)
|
|
|
|
encryption_config_1 = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME)
|
|
encryption_config_1.column_keys = {COL_KEY_NAME: ["a", "b"], }
|
|
encryption_config_1.encryption_algorithm = "AES_GCM_CTR_V1"
|
|
encryption_config_1.plaintext_footer = True
|
|
encryption_config_1.double_wrapping = False
|
|
encryption_config_1.cache_lifetime = timedelta(minutes=10.0)
|
|
encryption_config_1.internal_key_material = False
|
|
encryption_config_1.data_key_length_bits = 192
|
|
validate_encryption_configuration(encryption_config_1)
|
|
|
|
|
|
def test_encrypted_parquet_decryption_configuration():
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=10.0))
|
|
assert timedelta(minutes=10.0) == decryption_config.cache_lifetime
|
|
|
|
decryption_config_1 = pe.DecryptionConfiguration()
|
|
decryption_config_1.cache_lifetime = timedelta(minutes=10.0)
|
|
assert timedelta(minutes=10.0) == decryption_config_1.cache_lifetime
|
|
|
|
|
|
def test_encrypted_parquet_kms_configuration():
|
|
def validate_kms_connection_config(kms_connection_config):
|
|
assert "Instance1" == kms_connection_config.kms_instance_id
|
|
assert "URL1" == kms_connection_config.kms_instance_url
|
|
assert "MyToken" == kms_connection_config.key_access_token
|
|
assert ({"key1": "key_material_1", "key2": "key_material_2"} ==
|
|
kms_connection_config.custom_kms_conf)
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig(
|
|
kms_instance_id="Instance1",
|
|
kms_instance_url="URL1",
|
|
key_access_token="MyToken",
|
|
custom_kms_conf={
|
|
"key1": "key_material_1",
|
|
"key2": "key_material_2",
|
|
})
|
|
validate_kms_connection_config(kms_connection_config)
|
|
|
|
kms_connection_config_1 = pe.KmsConnectionConfig()
|
|
kms_connection_config_1.kms_instance_id = "Instance1"
|
|
kms_connection_config_1.kms_instance_url = "URL1"
|
|
kms_connection_config_1.key_access_token = "MyToken"
|
|
kms_connection_config_1.custom_kms_conf = {
|
|
"key1": "key_material_1",
|
|
"key2": "key_material_2",
|
|
}
|
|
validate_kms_connection_config(kms_connection_config_1)
|
|
|
|
|
|
@pytest.mark.xfail(reason="Plaintext footer - reading plaintext column subset"
|
|
" reads encrypted columns too")
|
|
def test_encrypted_parquet_write_read_plain_footer_single_wrapping(
|
|
tempdir, data_table):
|
|
"""Write an encrypted parquet, with plaintext footer
|
|
and with single wrapping,
|
|
verify it's encrypted, and then read plaintext columns."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the footer with the footer key,
|
|
# encrypt column `a` and column `b` with another key,
|
|
# keep `c` plaintext
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={
|
|
COL_KEY_NAME: ["a", "b"],
|
|
},
|
|
plaintext_footer=True,
|
|
double_wrapping=False)
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig(
|
|
custom_kms_conf={
|
|
FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8"),
|
|
COL_KEY_NAME: COL_KEY.decode("UTF-8"),
|
|
}
|
|
)
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
return InMemoryKmsClient(kms_connection_configuration)
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
# # Read without decryption properties only the plaintext column
|
|
# result = pq.ParquetFile(path)
|
|
# result_table = result.read(columns='c', use_threads=False)
|
|
# assert table.num_rows == result_table.num_rows
|
|
|
|
|
|
@pytest.mark.xfail(reason="External key material not supported yet")
|
|
def test_encrypted_parquet_write_external(tempdir, data_table):
|
|
"""Write an encrypted parquet, with external key
|
|
material.
|
|
Currently it's not implemented, so should throw
|
|
an exception"""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the file with the footer key
|
|
encryption_config = pe.EncryptionConfiguration(
|
|
footer_key=FOOTER_KEY_NAME,
|
|
column_keys={},
|
|
internal_key_material=False)
|
|
|
|
kms_connection_config = pe.KmsConnectionConfig(
|
|
custom_kms_conf={FOOTER_KEY_NAME: FOOTER_KEY.decode("UTF-8")}
|
|
)
|
|
|
|
def kms_factory(kms_connection_configuration):
|
|
return InMemoryKmsClient(kms_connection_configuration)
|
|
|
|
crypto_factory = pe.CryptoFactory(kms_factory)
|
|
# Write with encryption properties
|
|
write_encrypted_parquet(path, data_table, encryption_config,
|
|
kms_connection_config, crypto_factory)
|
|
|
|
|
|
def test_encrypted_parquet_loop(tempdir, data_table, basic_encryption_config):
|
|
"""Write an encrypted parquet, verify it's encrypted,
|
|
and then read it multithreaded in a loop."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Encrypt the footer with the footer key,
|
|
# encrypt column `a` and column `b` with another key,
|
|
# keep `c` plaintext, defined in basic_encryption_config
|
|
kms_connection_config, crypto_factory = write_encrypted_file(
|
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
|
|
basic_encryption_config)
|
|
|
|
verify_file_encrypted(path)
|
|
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
|
|
for i in range(50):
|
|
# Read with decryption properties
|
|
file_decryption_properties = crypto_factory.file_decryption_properties(
|
|
kms_connection_config, decryption_config)
|
|
assert file_decryption_properties is not None
|
|
|
|
result = pq.ParquetFile(
|
|
path, decryption_properties=file_decryption_properties)
|
|
result_table = result.read(use_threads=True)
|
|
assert data_table.equals(result_table)
|
|
|
|
|
|
def test_read_with_deleted_crypto_factory(tempdir, data_table, basic_encryption_config):
|
|
"""
|
|
Test that decryption properties can be used if the crypto factory is no longer alive
|
|
"""
|
|
path = tempdir / PARQUET_NAME
|
|
kms_connection_config, crypto_factory = write_encrypted_file(
|
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
|
|
basic_encryption_config)
|
|
verify_file_encrypted(path)
|
|
|
|
# Create decryption properties and delete the crypto factory that created
|
|
# the properties afterwards.
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
file_decryption_properties = crypto_factory.file_decryption_properties(
|
|
kms_connection_config, decryption_config)
|
|
del crypto_factory
|
|
|
|
result = pq.ParquetFile(
|
|
path, decryption_properties=file_decryption_properties)
|
|
result_table = result.read(use_threads=True)
|
|
assert data_table.equals(result_table)
|
|
|
|
|
|
def test_encrypted_parquet_read_table(tempdir, data_table, basic_encryption_config):
|
|
"""Write an encrypted parquet then read it back using read_table."""
|
|
path = tempdir / PARQUET_NAME
|
|
|
|
# Write the encrypted parquet file using the utility function
|
|
kms_connection_config, crypto_factory = write_encrypted_file(
|
|
path, data_table, FOOTER_KEY_NAME, COL_KEY_NAME, FOOTER_KEY, COL_KEY,
|
|
basic_encryption_config)
|
|
|
|
decryption_config = pe.DecryptionConfiguration(
|
|
cache_lifetime=timedelta(minutes=5.0))
|
|
file_decryption_properties = crypto_factory.file_decryption_properties(
|
|
kms_connection_config, decryption_config)
|
|
|
|
# Read the encrypted parquet file using read_table
|
|
result_table = pq.read_table(path, decryption_properties=file_decryption_properties)
|
|
|
|
# Assert that the read table matches the original data
|
|
assert data_table.equals(result_table)
|
|
|
|
# Read the encrypted parquet folder using read_table
|
|
result_table = pq.read_table(
|
|
tempdir, decryption_properties=file_decryption_properties)
|
|
assert data_table.equals(result_table)
|