diff --git a/gssapi/raw/__init__.py b/gssapi/raw/__init__.py index a17b4569..23fc755d 100644 --- a/gssapi/raw/__init__.py +++ b/gssapi/raw/__init__.py @@ -43,3 +43,11 @@ from gssapi.raw.ext_password_add import * # noqa except ImportError: pass + +# optional DCE (IOV/AEAD) support +try: + from gssapi.raw.ext_dce import * # noqa + # optional IOV MIC support (requires DCE support) + from gssapi.raw.ext_iov_mic import * # noqa +except ImportError: + pass diff --git a/gssapi/raw/ext_dce.pxd b/gssapi/raw/ext_dce.pxd new file mode 100644 index 00000000..7a7a6320 --- /dev/null +++ b/gssapi/raw/ext_dce.pxd @@ -0,0 +1,18 @@ +from gssapi.raw.cython_types cimport gss_buffer_desc, OM_uint32 + +cdef extern from "python_gssapi_ext.h": + ctypedef struct gss_iov_buffer_desc: + OM_uint32 type + gss_buffer_desc buffer + ctypedef gss_iov_buffer_desc* gss_iov_buffer_t + +cdef class IOV: + cdef int iov_len + cdef bint c_changed + + cdef bint _unprocessed + cdef list _buffs + cdef gss_iov_buffer_desc *_iov + + cdef gss_iov_buffer_desc* __cvalue__(IOV self) except NULL + cdef _recreate_python_values(IOV self) diff --git a/gssapi/raw/ext_dce.pyx b/gssapi/raw/ext_dce.pyx new file mode 100644 index 00000000..5c182108 --- /dev/null +++ b/gssapi/raw/ext_dce.pyx @@ -0,0 +1,548 @@ +GSSAPI="BASE" # This ensures that a full module is generated by Cython + +from libc.stdlib cimport malloc, calloc, free +from libc.string cimport memcpy + +from gssapi.raw.cython_types cimport * +from gssapi.raw.sec_contexts cimport SecurityContext + +from gssapi.raw.misc import GSSError, _EnumExtension +from gssapi.raw import types as gssapi_types +from gssapi.raw.named_tuples import IOVUnwrapResult, WrapResult, UnwrapResult +from collections import namedtuple, Sequence + +from enum import IntEnum +import six + +cdef extern from "python_gssapi_ext.h": + # NB(directxman12): this wiki page has a different argument order + # than the header file, and uses size_t instead of int + # (this file matches the header file) + OM_uint32 gss_wrap_iov(OM_uint32 *min_stat, gss_ctx_id_t ctx_handle, + int conf_req_flag, gss_qop_t qop_req, int *conf_ret, + gss_iov_buffer_desc *iov, int iov_count) nogil + + OM_uint32 gss_unwrap_iov(OM_uint32 *min_stat, gss_ctx_id_t ctx_handle, + int* conf_ret, gss_qop_t *qop_ret, + gss_iov_buffer_desc *iov, int iov_count) nogil + + OM_uint32 gss_wrap_iov_length(OM_uint32 *min_stat, gss_ctx_id_t ctx_handle, + int conf_req, gss_qop_t qop_req, + int *conf_ret, gss_iov_buffer_desc *iov, + int iov_count) nogil + + OM_uint32 gss_release_iov_buffer(OM_uint32 *min_stat, + gss_iov_buffer_desc *iov, + int iov_count) nogil + + OM_uint32 gss_wrap_aead(OM_uint32 *min_stat, gss_ctx_id_t ctx_handle, + int conf_req, gss_qop_t qop_req, + gss_buffer_t input_assoc_buffer, + gss_buffer_t input_payload_buffer, int *conf_ret, + gss_buffer_t output_message_buffer) nogil + + OM_uint32 gss_unwrap_aead(OM_uint32 *min_stat, gss_ctx_id_t ctx_handle, + gss_buffer_t input_message_buffer, + gss_buffer_t input_assoc_buffer, + gss_buffer_t output_payload_buffer, + int *conf_ret, gss_qop_t *qop_ret) nogil + + gss_iov_buffer_t GSS_C_NO_IOV_BUFFER + + OM_uint32 GSS_IOV_BUFFER_TYPE_EMPTY + OM_uint32 GSS_IOV_BUFFER_TYPE_DATA + OM_uint32 GSS_IOV_BUFFER_TYPE_HEADER + OM_uint32 GSS_IOV_BUFFER_TYPE_MECH_PARAMS + OM_uint32 GSS_IOV_BUFFER_TYPE_TRAILER + OM_uint32 GSS_IOV_BUFFER_TYPE_PADDING + OM_uint32 GSS_IOV_BUFFER_TYPE_STREAM + OM_uint32 GSS_IOV_BUFFER_TYPE_SIGN_ONLY + + OM_uint32 GSS_IOV_BUFFER_FLAG_MASK + OM_uint32 GSS_IOV_BUFFER_FLAG_ALLOCATE + OM_uint32 GSS_IOV_BUFFER_FLAG_ALLOCATED + + OM_uint32 GSS_C_DCE_STYLE + OM_uint32 GSS_C_IDENTIFY_FLAG + OM_uint32 GSS_C_EXTENDED_ERROR_FLAG + + +class IOVBufferType(IntEnum): + """ + IOV Buffer Types + + This IntEnum represent GSSAPI IOV buffer + types to be used with the IOV methods. + + The numbers behind the values correspond directly + to their C counterparts. + """ + + empty = GSS_IOV_BUFFER_TYPE_EMPTY + data = GSS_IOV_BUFFER_TYPE_DATA + header = GSS_IOV_BUFFER_TYPE_HEADER + mech_params = GSS_IOV_BUFFER_TYPE_MECH_PARAMS + trailer = GSS_IOV_BUFFER_TYPE_TRAILER + padding = GSS_IOV_BUFFER_TYPE_PADDING + stream = GSS_IOV_BUFFER_TYPE_STREAM + sign_only = GSS_IOV_BUFFER_TYPE_SIGN_ONLY + + +@six.add_metaclass(_EnumExtension) +class RequirementFlag(object): + __base__ = gssapi_types.RequirementFlag + + dce_style = GSS_C_DCE_STYLE + identify = GSS_C_IDENTIFY_FLAG + extended_error = GSS_C_EXTENDED_ERROR_FLAG + + +IOVBuffer = namedtuple('IOVBuffer', ['type', 'allocate', 'value']) + + +cdef class IOV: + # defined in ext_dce.pxd + + # cdef int iov_len + # cdef bint c_changed + + # cdef gss_iov_buffer_desc *_iov + # cdef bint _unprocessed + # cdef list _buffs + + AUTO_ALLOC_BUFFERS = set([IOVBufferType.header, IOVBufferType.padding, + IOVBufferType.trailer]) + + def __init__(IOV self, *args, std_layout=True, auto_alloc=True): + self._unprocessed = True + self.c_changed = False + + self._buffs = [] + + if std_layout: + self._buffs.append(IOVBuffer(IOVBufferType.header, + auto_alloc, None)) + + cdef char *val_copy + for buff_desc in args: + if isinstance(buff_desc, tuple): + if len(buff_desc) > 3 or len(buff_desc) < 2: + raise ValueError("Buffer description tuples must be " + "length 2 or 3") + + buff_type = buff_desc[0] + + if len(buff_desc) == 2: + if buff_type in self.AUTO_ALLOC_BUFFERS: + alloc = buff_desc[1] + data = None + else: + data = buff_desc[1] + alloc = False + else: + (buff_type, alloc, data) = buff_desc + + self._buffs.append(IOVBuffer(buff_type, alloc, data)) + elif isinstance(buff_desc, bytes): # assume type data + val = buff_desc + self._buffs.append(IOVBuffer(IOVBufferType.data, False, val)) + else: + alloc = False + if buff_desc in self.AUTO_ALLOC_BUFFERS: + alloc = auto_alloc + + self._buffs.append(IOVBuffer(buff_desc, alloc, None)) + + if std_layout: + self._buffs.append(IOVBuffer(IOVBufferType.padding, auto_alloc, + None)) + self._buffs.append(IOVBuffer(IOVBufferType.trailer, auto_alloc, + None)) + + cdef gss_iov_buffer_desc* __cvalue__(IOV self) except NULL: + cdef OM_uint32 tmp_min_stat + cdef int i + if self._unprocessed: + if self._iov is not NULL: + gss_release_iov_buffer(&tmp_min_stat, self._iov, self.iov_len) + free(self._iov) + + self.iov_len = len(self._buffs) + self._iov = calloc( + self.iov_len, sizeof(gss_iov_buffer_desc)) + if self._iov is NULL: + raise MemoryError("Cannot calloc for IOV buffer array") + + for i in range(self.iov_len): + buff = self._buffs[i] + self._iov[i].type = buff.type + + if buff.allocate: + self._iov[i].type |= GSS_IOV_BUFFER_FLAG_ALLOCATE + elif buff.allocate is None: + self._iov[i].type |= GSS_IOV_BUFFER_FLAG_ALLOCATED + + if buff.value is None: + self._iov[i].buffer.length = 0 + self._iov[i].buffer.value = NULL + else: + self._iov[i].buffer.length = len(buff.value) + self._iov[i].buffer.value = malloc( + self._iov[i].buffer.length) + if self._iov[i].buffer.value is NULL: + raise MemoryError("Cannot malloc for buffer value") + + memcpy(self._iov[i].buffer.value, buff.value, + self._iov[i].buffer.length) + + return self._iov + + cdef _recreate_python_values(IOV self): + cdef i + cdef bint val_change = False + cdef size_t new_len + for i in range(self.iov_len): + old_type = self._buffs[i].type + + if self._iov[i].buffer.value is NULL: + if self._iov[i].buffer.length == 0: + new_val = None + else: + new_len = self._iov[i].buffer.length + new_val = b'\x00' * new_len + else: + new_len = self._iov[i].buffer.length + new_val = self._iov[i].buffer.value[:new_len] + + alloc = False + if self._iov[i].type & GSS_IOV_BUFFER_FLAG_ALLOCATE: + alloc = True + + # NB(directxman12): GSSAPI (at least in MIT krb5) doesn't + # unset the allocate flag (because it's an "input flag", + # so this needs to come second and be separate + if self._iov[i].type & GSS_IOV_BUFFER_FLAG_ALLOCATED: + alloc = None + + self._buffs[i] = IOVBuffer(old_type, alloc, new_val) + + self.c_changed = False + + def __getitem__(IOV self, ind): + if self.c_changed: + self._recreate_python_values() + + return self._buffs[ind] + + def __len__(IOV self): + if self.c_changed: + self._recreate_python_values() + + return len(self._buffs) + + def __iter__(IOV self): + if self.c_changed: + self._recreate_python_values() + + for val in self._buffs: + yield val + + def __contains__(IOV self, item): + if self.c_changed: + self._recreate_python_values() + + return item in self._buffs + + def __reversed__(IOV self): + if self.c_changed: + self._recreate_python_values() + + for val in reversed(self._buffs): + yield val + + def index(IOV self, value): + for i, v in enumerate(self): + if v == value: + return i + + raise ValueError + + def count(IOV self, value): + return sum(1 for v in self if v == value) + + def __repr__(IOV self): + if self.c_changed: + self._recreate_python_values() + + return "<{module}.{name} {buffs}>".format( + module=type(self).__module__, name=type(self).__name__, + buffs=repr(self._buffs)) + + def __str__(IOV self): + buff_strs = [] + for buff in self: + type_val = str(buff.type).split('.')[1].upper() + if buff.value is None: + auto_alloc = buff.allocate + if auto_alloc: + buff_strs.append(type_val + "(allocate)") + else: + buff_strs.append(type_val + "(empty)") + else: + if buff.allocate is None: + alloc_str = ", allocated" + else: + alloc_str = "" + buff_strs.append("{0}({1!r}{2})".format(type_val, + buff.value, alloc_str)) + + return "".format(' | '.join(buff_strs)) + + def __dealloc__(IOV self): + cdef OM_uint32 tmp_min_stat + cdef int i + if self._iov is not NULL: + gss_release_iov_buffer(&tmp_min_stat, self._iov, self.iov_len) + + for i in range(self.iov_len): + if self._iov[i].buffer.value is not NULL: + free(self._iov[i].buffer.value) + + free(self._iov) + + +def wrap_iov(SecurityContext context not None, IOV message not None, + confidential=True, qop=None): + """ + Wrap/Encrypt an IOV message + + This method wraps or encrypts an IOV message. The allocate + parameter of the :class:`IOVBuffer` indicates whether or + not that particular buffer should be automatically allocated + (for use with padding, header, and trailer buffers). + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + confidential (bool): whether or not to encrypt the message (True), + or just wrap it with a MIC (False) + qop (int): the desired Quality of Protection + (or None for the default QoP) + + Returns: + WrapResult: the wrapped/encrypted message (IOV list), and + whether or not encryption was actually used + + Raises: + GSSError + """ + + cdef int conf_req = confidential + cdef gss_qop_t qop_req = qop if qop is not None else GSS_C_QOP_DEFAULT + cdef int conf_used + + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_wrap_iov(&min_stat, context.raw_ctx, conf_req, qop_req, + &conf_used, res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + message.c_changed = True + return conf_used + else: + raise GSSError(maj_stat, min_stat) + + +def unwrap_iov(SecurityContext context not None, IOV message not None): + """ + Unwrap/Decrypt an IOV message + + This method unwraps or decrypts an IOV message. The allocate + parameter of the :class:`IOVBuffer` indicates whether or + not that particular buffer should be automatically allocated + (for use with padding, header, and trailer buffers). + + As a special case, you may pass an entire IOV message + as a single 'stream'. In this case, pass a buffer type + of :attr:`IOVBufferType.stream` followed by a buffer type of + :attr:`IOVBufferType.data`. The former should contain the + entire IOV message, while the latter should be empty. + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + + Returns: + UnwrapResult: the unwrapped/decrypted message, whether or not + encryption was used, and the QoP used + + Raises: + GSSError + """ + + cdef int conf_used + cdef gss_qop_t qop_used + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_unwrap_iov(&min_stat, context.raw_ctx, &conf_used, + &qop_used, res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + message.c_changed = True + return IOVUnwrapResult(conf_used, qop_used) + else: + raise GSSError(maj_stat, min_stat) + + +def wrap_iov_length(SecurityContext context not None, IOV message not None, + confidential=True, qop=None): + """ + Appropriately size padding, trailer, and header IOV buffers + + This method sets the length values on the IOV buffers. You + should already have data provided for the data (and sign-only) + buffer(s) so that padding lengths can be appropriately computed. + + In Python terms, this will result in an appropriately sized + `bytes` object consisting of all zeros. + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + + Returns: + WrapResult: a list of :class:IOVBuffer` objects, and whether or not + encryption was actually used + + Raises: + GSSError + """ + + cdef int conf_req = confidential + cdef gss_qop_t qop_req = qop if qop is not None else GSS_C_QOP_DEFAULT + cdef int conf_used + + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_wrap_iov_length(&min_stat, context.raw_ctx, + conf_req, qop_req, + &conf_used, res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + message.c_changed = True + return conf_used + else: + raise GSSError(maj_stat, min_stat) + + +def wrap_aead(SecurityContext context not None, bytes message not None, + bytes associated=None, confidential=True, qop=None): + """ + Wrap/Encrypt an AEAD Message + + This method takes an input message and associated data, + and outputs and AEAD message. + + Args: + context (SecurityContext): the current security context + message (bytes): the message to wrap or encrypt + associated (bytes): associated data to go with the message + confidential (bool): whether or not to encrypt the message (True), + or just wrap it with a MIC (False) + qop (int): the desired Quality of Protection + (or None for the default QoP) + + Returns: + WrapResult: the wrapped/encrypted total message, and whether or not + encryption was actually used + + Raises: + GSSError + """ + + cdef int conf_req = confidential + cdef gss_qop_t qop_req = qop if qop is not None else GSS_C_QOP_DEFAULT + cdef gss_buffer_desc message_buffer = gss_buffer_desc(len(message), + message) + + cdef gss_buffer_t assoc_buffer_ptr = GSS_C_NO_BUFFER + cdef gss_buffer_desc assoc_buffer + if associated is not None: + assoc_buffer = gss_buffer_desc(len(associated), associated) + assoc_buffer_ptr = &assoc_buffer + + cdef int conf_used + # GSS_C_EMPTY_BUFFER + cdef gss_buffer_desc output_buffer = gss_buffer_desc(0, NULL) + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_wrap_aead(&min_stat, context.raw_ctx, conf_req, qop_req, + assoc_buffer_ptr, &message_buffer, + &conf_used, &output_buffer) + + if maj_stat == GSS_S_COMPLETE: + output_message = output_buffer.value[:output_buffer.length] + gss_release_buffer(&min_stat, &output_buffer) + return WrapResult(output_message, conf_used) + else: + raise GSSError(maj_stat, min_stat) + + +def unwrap_aead(SecurityContext context not None, bytes message not None, + bytes associated=None): + """ + Unwrap/Decrypt an AEAD Message + + This method takes an encrpyted/wrapped AEAD message and some associated + data, and returns an unwrapped/decrypted message. + + Args: + context (SecurityContext): the current security context + message (bytes): the AEAD message to unwrap or decrypt + associated (bytes): associated data that goes with the message + + Returns: + UnwrapResult: the unwrapped/decrypted message, whether or on + encryption was used, and the QoP used + + Raises: + GSSError + """ + + cdef gss_buffer_desc input_buffer = gss_buffer_desc(len(message), message) + + cdef gss_buffer_t assoc_buffer_ptr = GSS_C_NO_BUFFER + cdef gss_buffer_desc assoc_buffer + if associated is not None: + assoc_buffer = gss_buffer_desc(len(associated), associated) + assoc_buffer_ptr = &assoc_buffer + + # GSS_C_EMPTY_BUFFER + cdef gss_buffer_desc output_buffer = gss_buffer_desc(0, NULL) + cdef int conf_state + cdef gss_qop_t qop_state + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_unwrap_aead(&min_stat, context.raw_ctx, &input_buffer, + assoc_buffer_ptr, &output_buffer, + &conf_state, &qop_state) + + if maj_stat == GSS_S_COMPLETE: + output_message = output_buffer.value[:output_buffer.length] + gss_release_buffer(&min_stat, &output_buffer) + return UnwrapResult(output_message, conf_state, qop_state) + else: + raise GSSError(maj_stat, min_stat) diff --git a/gssapi/raw/ext_iov_mic.pyx b/gssapi/raw/ext_iov_mic.pyx new file mode 100644 index 00000000..a1cbd75f --- /dev/null +++ b/gssapi/raw/ext_iov_mic.pyx @@ -0,0 +1,145 @@ +GSSAPI="BASE" # This ensures that a full module is generated by Cython + +from gssapi.raw.cython_types cimport * +from gssapi.raw.sec_contexts cimport SecurityContext +from gssapi.raw.ext_dce cimport IOV, gss_iov_buffer_desc + +from gssapi.raw.misc import GSSError, _EnumExtension +from gssapi.raw import ext_dce + +import six + +cdef extern from "python_gssapi_ext.h": + OM_uint32 gss_get_mic_iov(OM_uint32 *min_stat, gss_ctx_id_t context_handle, + gss_qop_t qop_req, gss_iov_buffer_desc *iov, + int iov_count) nogil + + OM_uint32 gss_get_mic_iov_length(OM_uint32 *min_stat, + gss_ctx_id_t context_handle, + gss_qop_t qop_req, + gss_iov_buffer_desc *iov, + int iov_count) nogil + + OM_uint32 gss_verify_mic_iov(OM_uint32 *min_stat, + gss_ctx_id_t context_handle, + gss_qop_t *qop_state, + gss_iov_buffer_desc *iov, + int iov_count) nogil + + OM_uint32 GSS_IOV_BUFFER_TYPE_MIC_TOKEN + + +@six.add_metaclass(_EnumExtension) +class IOVBufferType(object): + __base__ = ext_dce.IOVBufferType + mic_token = GSS_IOV_BUFFER_TYPE_MIC_TOKEN + + +IOV.AUTO_ALLOC_BUFFERS.add(IOVBufferType.mic_token) + + +def get_mic_iov(SecurityContext context not None, IOV message not None, + qop=None): + """ + Generate MIC tokens for the given IOV message + + This method generates a MIC token for the given IOV message, and places it + in the :attr:`IOVBufferType.mic_token` buffer in the IOV. This method + operates entirely in-place, and returns nothing. + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + qop (int): the desired Quality of Protection + (or None for the default QoP) + + Raises: + GSSError + """ + + cdef gss_qop_t qop_req = qop if qop is not None else GSS_C_QOP_DEFAULT + + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_get_mic_iov(&min_stat, context.raw_ctx, qop_req, + res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + message.c_changed = True + return + else: + raise GSSError(maj_stat, min_stat) + + +def get_mic_iov_length(SecurityContext context not None, IOV message not None, + qop=None): + """ + Allocate space for the MIC buffer in the given IOV message + + This method allocates space for the MIC token buffer + (:attr:`IOVBufferType.mic_token`) in the given IOV message. + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + qop (int): the desired Quality of Protection + (or None for the default QoP) + + Raises: + GSSError + """ + + cdef gss_qop_t qop_req = qop if qop is not None else GSS_C_QOP_DEFAULT + + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_get_mic_iov_length(&min_stat, context.raw_ctx, qop_req, + res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + message.c_changed = True + return + else: + raise GSSError(maj_stat, min_stat) + + +def verify_mic_iov(SecurityContext context not None, IOV message not None, + qop=None): + """ + Verify that the MIC matches the data in the given IOV message + + This method verifies that the MIC token in the MIC buffer + (:attr:`IOVBufferType.mic_token`) match the data buffer(s) + in the given IOV method. + + Args: + context (SecurityContext): the current security context + message (list): a list of :class:`IOVBuffer` objects + + Returns: + int: the QoP used to generate the MIC token + + Raises: + GSSError + """ + + cdef gss_iov_buffer_desc *res_arr = message.__cvalue__() + + cdef gss_qop_t qop_state + + cdef OM_uint32 maj_stat, min_stat + + with nogil: + maj_stat = gss_verify_mic_iov(&min_stat, context.raw_ctx, &qop_state, + res_arr, message.iov_len) + + if maj_stat == GSS_S_COMPLETE: + return qop_state + else: + raise GSSError(maj_stat, min_stat) diff --git a/gssapi/raw/misc.pyx b/gssapi/raw/misc.pyx index 9fe6f834..c098b535 100644 --- a/gssapi/raw/misc.pyx +++ b/gssapi/raw/misc.pyx @@ -11,6 +11,8 @@ from gssapi.raw.oids cimport OID from gssapi.raw.types import MechType +from enum import EnumMeta, Enum + cdef extern from "python_gssapi.h": OM_uint32 gss_display_status(OM_uint32 *minor_status, @@ -329,3 +331,64 @@ class GSSError(Exception): maj_str=maj_str, min_stat=self.min_code, min_str=min_str) + + +# WARNING: FOR YOUR OWN PERSONAL SANITY, DO NOT TRY THIS AT HOME +# This allows things like extensions declaring their own RequirementFlags +# without having to worry about import order. There *might* be a cleaner +# way to do this, but most of the ways probably exploit the interals of Enum +class _EnumExtension(EnumMeta): + def __new__(metacls, name, bases, classdict): + # find the base class that this overrides + + base_attr = classdict.get('__base__', None) + + if len(bases) != 1 or bases[0] is not object or base_attr is None: + raise TypeError("Enumeration extensions must be created as " + "`ClassName(object)` and must contain the " + "`__base__` attribute") + + adds_to = base_attr + del classdict['__base__'] + + bases = adds_to.__bases__ + + # we need to have the same Enum type + if not issubclass(adds_to, Enum): + raise TypeError("Enumeration extensions must extend a subtype " + "of Enum or directly sublcass object") + + # roughly preserve insertion order in Python 3.4+ + # (actually preserving order would require using parts of the Enum + # implementation that aren't part of the spec) + old_classdict = classdict + classdict = metacls.__prepare__(name, bases) + + # NB(directxman12): we can't use update, because that doesn't + # trigger __setitem__, which thus won't update the _EnumDict correctly + base_members = adds_to.__members__ + for k, v in base_members.items(): + classdict[k] = v.value + + for k, v in old_classdict.items(): + if k in base_members: + raise AttributeError("Enumeration extensions cannot override " + "existing enumeration members") + + classdict[k] = v + + res = super(_EnumExtension, metacls).__new__(metacls, name, + bases, classdict) + + # replace the old with the new + for k, v in res.__dict__.items(): + if (k not in ('__module__', '__qualname__', '__doc__') and + k in adds_to.__dict__): + setattr(adds_to, k, v) + + # preserve enum semantics + for member in adds_to.__members__.values(): + member.__class__ = adds_to + + # always return the original + return adds_to diff --git a/gssapi/raw/named_tuples.py b/gssapi/raw/named_tuples.py index 6b918cee..4f52b103 100644 --- a/gssapi/raw/named_tuples.py +++ b/gssapi/raw/named_tuples.py @@ -53,5 +53,10 @@ 'lifetime', 'mech', 'flags', 'locally_init', 'complete']) + StoreCredResult = namedtuple('StoreCredResult', ['mechs', 'usage']) + + +IOVUnwrapResult = namedtuple('IOVUnwrapResult', + ['encrypted', 'qop']) diff --git a/gssapi/tests/test_raw.py b/gssapi/tests/test_raw.py index 9e7eebe6..27982683 100644 --- a/gssapi/tests/test_raw.py +++ b/gssapi/tests/test_raw.py @@ -5,6 +5,8 @@ import unittest import should_be.all # noqa +from enum import IntEnum +import six import gssapi.raw as gb import gssapi.raw.misc as gbmisc @@ -641,6 +643,78 @@ def test_xor_set(self): fset3.should_include(gb.RequirementFlag.out_of_sequence_detection) +class TestEnumExtension(unittest.TestCase): + def setUp(self): + class ExtendedEnum(IntEnum): + """some docs""" + a = 1 + b = 2 + c = 3 + + class OtherEnum(IntEnum): + a = 1 + b = 2 + c = 3 + + self._ext_enum = ExtendedEnum + self._plain_enum = OtherEnum + + def test_same_dict(self): + @six.add_metaclass(gbmisc._EnumExtension) + class ExtendedEnum(object): + __base__ = self._ext_enum + d = 4 + e = 5 + + self._ext_enum.__dict__.keys().should_be( + self._plain_enum.__dict__.keys()) + + def test_members_copy_mult(self): + @six.add_metaclass(gbmisc._EnumExtension) + class ExtendedEnum(object): + __base__ = self._ext_enum + d = 4 + + @six.add_metaclass(gbmisc._EnumExtension) + class ExtendedEnum2(object): + __base__ = self._ext_enum + e = 5 + + self._ext_enum.__members__.should_include( + ['a', 'b', 'c', 'd', 'e']) + + self._ext_enum.a.should_be(1) + self._ext_enum.b.should_be(2) + self._ext_enum.c.should_be(3) + self._ext_enum.d.should_be(4) + self._ext_enum.e.should_be(5) + + def test_all_class_are_same(self): + @six.add_metaclass(gbmisc._EnumExtension) + class ExtendedEnum(object): + __base__ = self._ext_enum + d = 4 + + assert ExtendedEnum is self._ext_enum + + def test_members_are_still_instance(self): + @six.add_metaclass(gbmisc._EnumExtension) + class EnumExt(object): + __base__ = self._ext_enum + d = 4 + + self._ext_enum.a.should_be_a(self._ext_enum) + self._ext_enum.d.should_be_a(self._ext_enum) + + def test_doc_is_preserved(self): + @six.add_metaclass(gbmisc._EnumExtension) + class ExtendedEnum(object): + __base__ = self._ext_enum + d = 4 + + self._ext_enum.__doc__.should_be('some docs') + + class TestInitContext(_GSSAPIKerberosTestCase): def setUp(self): self.target_name = gb.import_name(TARGET_SERVICE_NAME, @@ -870,6 +944,197 @@ def test_basic_wrap_unwrap(self): unwrapped_message.shouldnt_be_empty() unwrapped_message.should_be(b'test message') + @_extension_test('dce', 'DCE (IOV/AEAD)') + def test_basic_iov_wrap_unwrap_prealloc(self): + init_data = b'some encrypted data' + init_other_data = b'some other encrypted data' + init_signed_info = b'some sig data' + init_message = gb.IOV((gb.IOVBufferType.sign_only, init_signed_info), + init_data, init_other_data, auto_alloc=False) + + init_message[0].allocate.should_be_false() + init_message[4].allocate.should_be_false() + init_message[5].allocate.should_be_false() + + conf = gb.wrap_iov_length(self.client_ctx, init_message) + + conf.should_be_a(bool) + conf.should_be_true() + + init_message[0].should_be_at_least_size(1) + init_message[5].should_be_at_least_size(1) + + conf = gb.wrap_iov(self.client_ctx, init_message) + + conf.should_be_a(bool) + conf.should_be_true() + + # make sure we didn't strings used + init_data.should_be(b'some encrypted data') + init_other_data.should_be(b'some other encrypted data') + init_signed_info.should_be(b'some sig data') + + init_message[2].value.shouldnt_be(b'some encrypted data') + init_message[3].value.shouldnt_be(b'some other encrypted data') + + (conf, qop) = gb.unwrap_iov(self.server_ctx, init_message) + + conf.should_be_a(bool) + conf.should_be_true() + + qop.should_be_a(int) + + init_message[1].value.should_be(init_signed_info) + init_message[2].value.should_be(init_data) + init_message[3].value.should_be(init_other_data) + + @_extension_test('dce', 'DCE (IOV/AEAD)') + def test_basic_iov_wrap_unwrap_autoalloc(self): + init_data = b'some encrypted data' + init_other_data = b'some other encrypted data' + init_signed_info = b'some sig data' + init_message = gb.IOV((gb.IOVBufferType.sign_only, init_signed_info), + init_data, init_other_data) + + conf = gb.wrap_iov(self.client_ctx, init_message) + + conf.should_be_a(bool) + conf.should_be_true() + + # make sure we didn't strings used + init_data.should_be(b'some encrypted data') + init_other_data.should_be(b'some other encrypted data') + init_signed_info.should_be(b'some sig data') + + init_message[2].value.shouldnt_be(b'some encrypted data') + init_message[3].value.shouldnt_be(b'some other encrypted data') + + (conf, qop) = gb.unwrap_iov(self.server_ctx, init_message) + + conf.should_be_a(bool) + conf.should_be_true() + + qop.should_be_a(int) + + init_message[1].value.should_be(init_signed_info) + init_message[2].value.should_be(init_data) + init_message[3].value.should_be(init_other_data) + + @_extension_test('dce', 'DCE (IOV/AEAD)') + def test_basic_aead_wrap_unwrap(self): + assoc_data = b'some sig data' + (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, + b'test message', assoc_data) + + conf.should_be_a(bool) + conf.should_be_true() + + wrapped_message.should_be_a(bytes) + wrapped_message.shouldnt_be_empty() + wrapped_message.should_be_longer_than('test message') + + (unwrapped_message, conf, qop) = gb.unwrap_aead(self.server_ctx, + wrapped_message, + assoc_data) + conf.should_be_a(bool) + conf.should_be_true() + + qop.should_be_an_integer() + qop.should_be_at_least(0) + + unwrapped_message.should_be_a(bytes) + unwrapped_message.shouldnt_be_empty() + unwrapped_message.should_be(b'test message') + + @_extension_test('dce', 'DCE (IOV/AEAD)') + def test_basic_aead_wrap_unwrap_no_assoc(self): + (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, + b'test message') + + conf.should_be_a(bool) + conf.should_be_true() + + wrapped_message.should_be_a(bytes) + wrapped_message.shouldnt_be_empty() + wrapped_message.should_be_longer_than('test message') + + (unwrapped_message, conf, qop) = gb.unwrap_aead(self.server_ctx, + wrapped_message) + conf.should_be_a(bool) + conf.should_be_true() + + qop.should_be_an_integer() + qop.should_be_at_least(0) + + unwrapped_message.should_be_a(bytes) + unwrapped_message.shouldnt_be_empty() + unwrapped_message.should_be(b'test message') + + @_extension_test('dce', 'DCE (IOV/AEAD)') + def test_basic_aead_wrap_unwrap_bad_assoc_raises_error(self): + assoc_data = b'some sig data' + (wrapped_message, conf) = gb.wrap_aead(self.client_ctx, + b'test message', assoc_data) + + conf.should_be_a(bool) + conf.should_be_true() + + wrapped_message.should_be_a(bytes) + wrapped_message.shouldnt_be_empty() + wrapped_message.should_be_longer_than('test message') + + gb.unwrap_aead.should_raise(gb.BadMICError, self.server_ctx, + wrapped_message, b'some other sig data') + + @_extension_test('iov_mic', 'IOV MIC') + def test_get_mic_iov(self): + init_message = gb.IOV(b'some data', + (gb.IOVBufferType.sign_only, b'some sig data'), + gb.IOVBufferType.mic_token, std_layout=False) + + gb.get_mic_iov(self.client_ctx, init_message) + + init_message[2].type.should_be(gb.IOVBufferType.mic_token) + init_message[2].value.shouldnt_be_empty() + + @_extension_test('iov_mic', 'IOV MIC') + def test_basic_verify_mic_iov(self): + init_message = gb.IOV(b'some data', + (gb.IOVBufferType.sign_only, b'some sig data'), + gb.IOVBufferType.mic_token, std_layout=False) + + gb.get_mic_iov(self.client_ctx, init_message) + + init_message[2].type.should_be(gb.IOVBufferType.mic_token) + init_message[2].value.shouldnt_be_empty() + + qop_used = gb.verify_mic_iov(self.server_ctx, init_message) + + qop_used.should_be_an_integer() + + @_extension_test('iov_mic', 'IOV MIC') + def test_verify_mic_iov_bad_mic_raises_error(self): + init_message = gb.IOV(b'some data', + (gb.IOVBufferType.sign_only, b'some sig data'), + (gb.IOVBufferType.mic_token, 'abaava'), + std_layout=False) + + # test a bad MIC + gb.verify_mic_iov.should_raise(gb.GSSError, self.server_ctx, + init_message) + + @_extension_test('iov_mic', 'IOV MIC') + def test_get_mic_iov_length(self): + init_message = gb.IOV(b'some data', + (gb.IOVBufferType.sign_only, b'some sig data'), + gb.IOVBufferType.mic_token, std_layout=False, + auto_alloc=False) + + gb.get_mic_iov_length(self.client_ctx, init_message) + + init_message[2].type.should_be(gb.IOVBufferType.mic_token) + init_message[2].value.shouldnt_be_empty() + TEST_OIDS = {'SPNEGO': {'bytes': b'\053\006\001\005\005\002', 'string': '1.3.6.1.5.5.2'}, diff --git a/setup.py b/setup.py index 1cedc79c..ad952e52 100755 --- a/setup.py +++ b/setup.py @@ -62,7 +62,7 @@ def _get_output(*args, **kwargs): compile_args = get_output('krb5-config --cflags gssapi') link_args = link_args.split() -compile_args = compile_args.split() +compile_args = compile_args.split() + ['-g'] ENABLE_SUPPORT_DETECTION = \ (os.environ.get('GSSAPI_SUPPORT_DETECT', 'true').lower() == 'true') @@ -191,6 +191,8 @@ def gssapi_modules(lst): extension_file('cred_store', 'gss_store_cred_into'), extension_file('rfc5588', 'gss_store_cred'), extension_file('cred_imp_exp', 'gss_import_cred'), + extension_file('dce', 'gss_wrap_iov'), + extension_file('iov_mic', 'gss_get_mic_iov'), # see ext_password{,_add}.pyx for more information on this split extension_file('password', 'gss_acquire_cred_with_password'),