kubernetes_asyncio.leaderelection.leaderelection_test module
- class kubernetes_asyncio.leaderelection.leaderelection_test.LeaderElectionTest(methodName='runTest')
Bases:
IsolatedAsyncioTestCase- assert_history(history, expected) None
- async test_leader_election() None
- async test_leader_election_with_renew_deadline() None
- async test_simple_leader_election() None
- class kubernetes_asyncio.leaderelection.leaderelection_test.MockResourceLock(name: str, namespace: str, identity: str, shared_lock: Lock, on_create: Callable, on_update: Callable, on_change: Callable, on_try_update: Callable | None)
Bases:
BaseLock- async create(name: str, namespace: str, election_record: LeaderElectionRecord) bool
- Parameters:
electionRecord – Annotation string
name – Name of the configmap object to be created
namespace – Namespace in which the configmap object is to be created
- Returns:
‘True’ if object is created else ‘False’ if failed
- async get(name: str, namespace: str) tuple[bool, LeaderElectionRecord] | tuple[bool, Exception] | tuple[bool, None]
- Parameters:
name – Name of the configmap object information to get
namespace – Namespace in which the configmap object is to be searched
- Returns:
‘True, election record’ if object found else ‘False, exception response’
- async update(name: str, namespace: str, updated_record: LeaderElectionRecord) bool
- Parameters:
name – name of the lock to be updated
namespace – namespace the lock is in
updated_record – the updated election record
- Returns:
True if update is successful False if it fails