Source code for glideinwms.unittests.test_lib_DiskCache

#!/usr/bin/env python3

# SPDX-FileCopyrightText: 2009 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

"""
Project:
   glideinWMS

 Description:
   unit test for glideinwms/creation/lib/cvWParams.py

 Author:
   Dennis Box dbox@fnal.gov
"""

import os
import tempfile
import time
import unittest

import xmlrunner

from glideinwms.lib.disk_cache import DiskCache


[docs] class TestDiskCache(unittest.TestCase): """Test the DiskCache class"""
[docs] def setUp(self): self.obj = "I am the object to be saved" self.objid = "objid" self.cache = DiskCache(".")
[docs] def tearDown(self): os.remove(self.objid) os.remove(self.objid + ".lock")
[docs] def test_normal(self): """Test a few things""" # No cache file exists self.assertFalse(os.path.isfile(self.objid)) # The cache is empty at the beginning cached_obj = self.cache.get(self.objid) self.assertIsNone(cached_obj) # Save the object and check the cache file exists now self.cache.save(self.objid, self.obj) self.assertTrue(os.path.isfile(self.objid)) # Check we get the object from the cache cached_obj = self.cache.get(self.objid) self.assertEqual(self.obj, cached_obj) # Create a new cache (e.g., from another process) and verify we get the object from file cache = DiskCache(".") cached_obj = cache.get(self.objid) self.assertEqual(self.obj, cached_obj) # But from another directory we do not hit the cache new_location = tempfile.mkdtemp() cache = DiskCache(new_location) cached_obj = cache.get(self.objid) self.assertIsNone(cached_obj) os.rmdir(new_location) # And now what happens if we let the cache expire? We do not get the object! self.cache.cache_duration = 2 # 2 seconds time.sleep(2) cached_obj = self.cache.get(self.objid) self.assertIsNone(cached_obj) # But if I save it again it works (assuming this takes less than 2 seconds to be executes: # we are not running on potatoes) self.cache.save(self.objid, self.obj) cached_obj = self.cache.get(self.objid) self.assertEqual(self.obj, cached_obj) # It works also on another cache with empty memory cache cache = DiskCache(".") cached_obj = cache.get(self.objid) self.assertEqual(self.obj, cached_obj)
if __name__ == "__main__": unittest.main(testRunner=xmlrunner.XMLTestRunner(output="unittests-reports"))