Skip to content

Commit 842017b

Browse files
Fix S4U tests
These tests relied on rickety behavior and caching. Redo them to actually test impersonation. Fixes problems with krb5 >= 1.18.2. Resolves: #220 Signed-off-by: Robbie Harwood <[email protected]>
1 parent 743af8e commit 842017b

File tree

2 files changed

+76
-60
lines changed

2 files changed

+76
-60
lines changed

gssapi/tests/test_high_level.py

Lines changed: 33 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -312,39 +312,49 @@ def test_pickle_unpickle(self):
312312
usage='initiate')
313313
@ktu.gssapi_extension_test('s4u', 'S4U')
314314
def test_impersonate(self, str_name, kwargs):
315-
target_name = gssnames.Name(TARGET_SERVICE_NAME,
316-
gb.NameType.hostbased_service)
317-
# TODO(directxman12): make this use the high-level SecurityContext
318-
client_ctx_resp = gb.init_sec_context(target_name)
319-
client_token = client_ctx_resp[3]
320-
del client_ctx_resp # free everything but the token
321-
322-
server_name = self.name
323-
server_creds = gsscreds.Credentials(name=server_name,
324-
usage='both')
325-
server_ctx_resp = gb.accept_sec_context(client_token,
326-
acceptor_creds=server_creds)
327-
328-
imp_creds = server_creds.impersonate(server_ctx_resp[1], **kwargs)
315+
server_name = gssnames.Name(SERVICE_PRINCIPAL,
316+
gb.NameType.kerberos_principal)
317+
318+
password = self.realm.password("user")
319+
self.realm.kinit(self.realm.user_princ, password=password,
320+
flags=["-f"])
321+
client_ctx = gssctx.SecurityContext(
322+
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
323+
client_token = client_ctx.step()
324+
325+
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
326+
server_creds = gsscreds.Credentials(usage="both")
327+
server_ctx = gssctx.SecurityContext(creds=server_creds)
328+
server_ctx.step(client_token)
329+
self.assertTrue(server_ctx.complete)
330+
331+
imp_creds = server_ctx.delegated_creds.impersonate(server_name,
332+
**kwargs)
329333
self.assertIsInstance(imp_creds, gsscreds.Credentials)
330334

331335
@ktu.gssapi_extension_test('s4u', 'S4U')
332336
def test_add_with_impersonate(self):
333-
target_name = gssnames.Name(TARGET_SERVICE_NAME,
334-
gb.NameType.hostbased_service)
335-
client_ctx = gssctx.SecurityContext(name=target_name)
337+
server_name = gssnames.Name(SERVICE_PRINCIPAL,
338+
gb.NameType.kerberos_principal)
339+
340+
password = self.realm.password("user")
341+
self.realm.kinit(self.realm.user_princ, password=password,
342+
flags=["-f"])
343+
client_ctx = gssctx.SecurityContext(
344+
name=server_name, flags=gb.RequirementFlag.delegate_to_peer)
336345
client_token = client_ctx.step()
337346

338-
server_creds = gsscreds.Credentials(usage='both')
339-
server_ctx = gssctx.SecurityContext(creds=server_creds, usage='accept')
347+
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
348+
server_creds = gsscreds.Credentials(usage="both")
349+
server_ctx = gssctx.SecurityContext(creds=server_creds)
340350
server_ctx.step(client_token)
351+
self.assertTrue(server_ctx.complete)
341352

342353
# use empty creds to test here
343354
input_creds = gsscreds.Credentials(gb.Creds())
344-
new_creds = input_creds.add(server_ctx.initiator_name,
345-
gb.MechType.kerberos,
346-
impersonator=server_creds,
347-
usage='initiate')
355+
new_creds = input_creds.add(
356+
server_name, gb.MechType.kerberos,
357+
impersonator=server_ctx.delegated_creds, usage='initiate')
348358
self.assertIsInstance(new_creds, gsscreds.Credentials)
349359

350360

gssapi/tests/test_raw.py

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ def setUp(self):
5858
def test_indicate_mechs(self):
5959
mechs = gb.indicate_mechs()
6060
self.assertIsInstance(mechs, set)
61-
self.assertGreater(len(mechs), 0)
6261
self.assertIn(gb.MechType.kerberos, mechs)
6362

6463
def test_import_name(self):
@@ -320,56 +319,63 @@ def test_inquire_context(self):
320319

321320
@ktu.gssapi_extension_test('s4u', 'S4U')
322321
def test_add_cred_impersonate_name(self):
323-
target_name = gb.import_name(TARGET_SERVICE_NAME,
324-
gb.NameType.hostbased_service)
325-
client_ctx_resp = gb.init_sec_context(target_name)
326-
client_token = client_ctx_resp[3]
327-
del client_ctx_resp # free all the things (except the token)!
328-
329322
server_name = gb.import_name(SERVICE_PRINCIPAL,
330323
gb.NameType.kerberos_principal)
331-
server_creds = gb.acquire_cred(server_name, usage='both')[0]
332-
server_ctx_resp = gb.accept_sec_context(client_token,
333-
acceptor_creds=server_creds)
324+
325+
password = self.realm.password('user')
326+
self.realm.kinit(self.realm.user_princ, password=password,
327+
flags=["-f"])
328+
name = gb.import_name(b"user", gb.NameType.kerberos_principal)
329+
client_creds = gb.acquire_cred(name, usage="initiate").creds
330+
cctx_res = gb.init_sec_context(
331+
server_name, creds=client_creds,
332+
flags=gb.RequirementFlag.delegate_to_peer)
333+
334+
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
335+
server_creds = gb.acquire_cred(server_name, usage="both").creds
336+
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
337+
self.assertTrue(gb.inquire_context(sctx_res.context).complete)
334338

335339
input_creds = gb.Creds()
336340
imp_resp = gb.add_cred_impersonate_name(input_creds,
337-
server_creds,
338-
server_ctx_resp[1],
341+
sctx_res.delegated_creds,
342+
server_name,
339343
gb.MechType.kerberos)
340344
self.assertIsNotNone(imp_resp)
341-
342-
new_creds, actual_mechs, output_init_ttl, output_accept_ttl = imp_resp
343-
self.assertIsInstance(new_creds, gb.Creds)
344-
self.assertIn(gb.MechType.kerberos, actual_mechs)
345-
self.assertIsInstance(output_init_ttl, int)
346-
self.assertIsInstance(output_accept_ttl, int)
345+
self.assertIsInstance(imp_resp, gb.AddCredResult)
346+
self.assertIsInstance(imp_resp.creds, gb.Creds)
347+
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
348+
self.assertIsInstance(imp_resp.init_lifetime, int)
349+
self.assertGreater(imp_resp.init_lifetime, 0)
350+
self.assertIsInstance(imp_resp.accept_lifetime, int)
351+
self.assertEqual(imp_resp.accept_lifetime, 0)
347352

348353
@ktu.gssapi_extension_test('s4u', 'S4U')
349354
def test_acquire_creds_impersonate_name(self):
350-
target_name = gb.import_name(TARGET_SERVICE_NAME,
351-
gb.NameType.hostbased_service)
352-
client_ctx_resp = gb.init_sec_context(target_name)
353-
client_token = client_ctx_resp[3]
354-
del client_ctx_resp # free all the things (except the token)!
355-
356355
server_name = gb.import_name(SERVICE_PRINCIPAL,
357356
gb.NameType.kerberos_principal)
358-
server_creds = gb.acquire_cred(server_name, usage='both')[0]
359-
server_ctx_resp = gb.accept_sec_context(client_token,
360-
acceptor_creds=server_creds)
361357

362-
imp_resp = gb.acquire_cred_impersonate_name(server_creds,
363-
server_ctx_resp[1])
364-
self.assertIsNotNone(imp_resp)
365-
366-
imp_creds, actual_mechs, output_ttl = imp_resp
367-
self.assertIsInstance(imp_creds, gb.Creds)
368-
self.assertIn(gb.MechType.kerberos, actual_mechs)
369-
self.assertIsInstance(output_ttl, int)
358+
password = self.realm.password('user')
359+
self.realm.kinit(self.realm.user_princ, password=password,
360+
flags=["-f"])
361+
name = gb.import_name(b'user', gb.NameType.kerberos_principal)
362+
client_creds = gb.acquire_cred(name, usage="initiate").creds
363+
cctx_res = gb.init_sec_context(
364+
server_name, creds=client_creds,
365+
flags=gb.RequirementFlag.delegate_to_peer)
370366

371-
# no need to explicitly release any more -- we can just rely on
372-
# __dealloc__ (b/c cython)
367+
self.realm.kinit(SERVICE_PRINCIPAL.decode("utf-8"), flags=["-k"])
368+
server_creds = gb.acquire_cred(server_name, usage='both').creds
369+
sctx_res = gb.accept_sec_context(cctx_res.token, server_creds)
370+
self.assertTrue(gb.inquire_context(sctx_res.context).complete)
371+
372+
imp_resp = gb.acquire_cred_impersonate_name(sctx_res.delegated_creds,
373+
server_name)
374+
self.assertIsInstance(imp_resp, gb.AcquireCredResult)
375+
self.assertIsInstance(imp_resp.creds, gb.Creds)
376+
self.assertIn(gb.MechType.kerberos, imp_resp.mechs)
377+
self.assertIsInstance(imp_resp.lifetime, int)
378+
self.assertGreater(imp_resp.lifetime, 0)
373379

374380
@ktu.gssapi_extension_test('s4u', 'S4U')
375381
@ktu.krb_minversion_test('1.11',

0 commit comments

Comments
 (0)