try: # Python 2.7: use the C pickle to speed up # test_concurrency_safe_write which pickles big python objects import cPickle as cpickle except ImportError: import pickle as cpickle import functools import time from pickle import PicklingError import pytest from joblib import Parallel, delayed from joblib._store_backends import ( CacheWarning, FileSystemStoreBackend, concurrency_safe_write, ) from joblib.backports import concurrency_safe_rename from joblib.test.common import with_multiprocessing from joblib.testing import parametrize, timeout def write_func(output, filename): with open(filename, "wb") as f: cpickle.dump(output, f) def load_func(expected, filename): for i in range(10): try: with open(filename, "rb") as f: reloaded = cpickle.load(f) break except (OSError, IOError): # On Windows you can have WindowsError ([Error 5] Access # is denied or [Error 13] Permission denied) when reading the file, # probably because a writer process has a lock on the file time.sleep(0.1) else: raise assert expected == reloaded def concurrency_safe_write_rename(to_write, filename, write_func): temporary_filename = concurrency_safe_write(to_write, filename, write_func) concurrency_safe_rename(temporary_filename, filename) @timeout(0) # No timeout as this test can be long @with_multiprocessing @parametrize("backend", ["multiprocessing", "loky", "threading"]) def test_concurrency_safe_write(tmpdir, backend): # Add one item to cache filename = tmpdir.join("test.pkl").strpath obj = {str(i): i for i in range(int(1e5))} funcs = [ functools.partial(concurrency_safe_write_rename, write_func=write_func) if i % 3 != 2 else load_func for i in range(12) ] Parallel(n_jobs=2, backend=backend)(delayed(func)(obj, filename) for func in funcs) def test_warning_on_dump_failure(tmpdir): # Check that a warning is raised when the dump fails for any reason but # a PicklingError. class UnpicklableObject(object): def __reduce__(self): raise RuntimeError("some exception") backend = FileSystemStoreBackend() backend.location = tmpdir.join("test_warning_on_pickling_error").strpath backend.compress = None with pytest.warns(CacheWarning, match="some exception"): backend.dump_item("testpath", UnpicklableObject()) def test_warning_on_pickling_error(tmpdir): # This is separate from test_warning_on_dump_failure because in the # future we will turn this into an exception. class UnpicklableObject(object): def __reduce__(self): raise PicklingError("not picklable") backend = FileSystemStoreBackend() backend.location = tmpdir.join("test_warning_on_pickling_error").strpath backend.compress = None with pytest.warns(FutureWarning, match="not picklable"): backend.dump_item("testpath", UnpicklableObject())