from my_config_loader import MyConfigLoader
from my_logger import MyLogger
from os import listdir, devnull, fsdecode
from PyKCS11 import PyKCS11Lib, Mechanism, LowLevel
from asn1crypto.x509 import Certificate
####################################################################
# CONFIGURATION #
####################################################################
# driver directory
DRIVER_FOLDER = MyConfigLoader().get_server_config()["driver_folder"]
####################################################################
log = MyLogger().my_logger()
# custom exceptions
[documenti]class SmartCardConnectionError(ConnectionError):
""" Raised when something goes wrong with the smart card """
pass
[documenti]class SignatureUtils:
[documenti] @staticmethod
def fetch_smart_card_sessions():
""" Return a `session` list for the connected smart cards """
log.info("loading drivers")
pkcs11 = PyKCS11Lib()
driver_loaded = False
# try with default
try:
pkcs11.load()
driver_loaded = True
except:
log.warning("no default driver")
# anyway load known drivers
for file in listdir(DRIVER_FOLDER):
try:
pkcs11.load(file)
log.info(f"driver {fsdecode(file)} loaded")
driver_loaded = True
except:
log.warning(
f"driver {fsdecode(file)} NOT loaded")
continue
# cannot load any driver file
if not driver_loaded:
raise SmartCardConnectionError("No driver found")
slots = SignatureUtils._fetch_slots(pkcs11)
sessions = []
for slot in slots:
try:
session = pkcs11.openSession(
slot, LowLevel.CKS_RW_PUBLIC_SESSION)
sessions.append(session)
except:
continue
if len(sessions) < 1:
raise SmartCardConnectionError("Can not open any session")
return sessions
@staticmethod
def _fetch_slots(pkcs11_lib):
""" Return a `slot list` (connected Smart Cards) """
log.info("getting slots")
try:
slots = pkcs11_lib.getSlotList(tokenPresent=True)
if len(slots) < 1:
raise Exception() # only to get to the external except block
else:
return slots
except:
raise SmartCardConnectionError("No smart card slot found")
[documenti] @staticmethod
def close_session(session):
""" Close smart card `session` """
log.info("Close smart card session")
session.closeSession()
[documenti] @staticmethod
def user_login(sessions, pin):
"""
User login on a `session` using `pin`
Params:
sessions: smart card session list
pin: user pin
Returns:
the logged in session
"""
log.info("user login")
for session in sessions:
try:
session.login(pin)
return session
except:
continue
raise SmartCardConnectionError(
"Can not login on any sessions provided")
[documenti] @staticmethod
def user_logout(session):
"""
User logout from a `session`
Params:
session: smart card session
"""
log.info("user logout")
session.logout()
[documenti] @staticmethod
def fetch_certificate(session):
"""
Return smart card certificate
Params:
session: smart card session
"""
log.info("fetching certificate")
try:
certificates = session.findObjects(
[(LowLevel.CKA_CLASS, LowLevel.CKO_CERTIFICATE)])
except:
raise SmartCardConnectionError("Certificate not found")
certificate = None
log.info("picking non_repudiation certificate")
for cert in certificates:
cert_value = SignatureUtils.get_certificate_value(session, cert)
x509 = Certificate.load(cert_value)
try:
key_usage = x509.key_usage_value
if 'non_repudiation' in key_usage.native:
certificate = cert
break
except:
log.info("key usage not found, skip this certificate")
continue
if certificate is None:
log.info("non_repudiation certificate not found, pick the latest one")
last_certificate_index = len(certificates) - 1
certificate = certificates[last_certificate_index]
return certificate
[documenti] @staticmethod
def get_certificate_value(session, certificate):
"""
Return the value of `certificate`
Params:
session: smart card session
certificate: smart card certificate
"""
log.info("fetching certificate value")
try:
certificate_value = session.getAttributeValue(
certificate, [LowLevel.CKA_VALUE])[0]
except:
raise SmartCardConnectionError("Certificate has no valid value")
return bytes(certificate_value)
[documenti] @staticmethod
def get_certificate_issuer(session, certificate):
"""
Return the issuer of `certificate`
Params:
session: smart card session
certificate: smart card certificate
"""
log.info("fetching certificate issuer")
try:
certificate_issuer = session.getAttributeValue(
certificate, [LowLevel.CKA_ISSUER])[0]
except:
raise SmartCardConnectionError("Certificate has no valid issuer")
return bytes(certificate_issuer)
[documenti] @staticmethod
def get_certificate_serial_number(session, certificate):
"""
Return the serial number of `certificate`
Params:
session: smart card session
certificate: smart card certificate
"""
log.info("fetching certificate serial number")
try:
serial_number = session.getAttributeValue(
certificate, [LowLevel.CKA_SERIAL_NUMBER])[0]
except:
raise SmartCardConnectionError(
"Certificate has no valid serial number")
try:
int_serial_number = int.from_bytes(
serial_number, byteorder='big', signed=True)
except:
raise SmartCardConnectionError(
"Can not cast certificate serial number to integer")
return int_serial_number
[documenti] @staticmethod
def fetch_private_key(session, certificate):
"""
Return smart card private key reference
Params:
session: smart card session
certificate: certificate connected to the key
"""
log.info("fetching private key")
try:
# getting the certificate id
identifier = session.getAttributeValue(
certificate, [LowLevel.CKA_ID])[0]
# same as the key id
priv_key = session.findObjects([
(LowLevel.CKA_CLASS, LowLevel.CKO_PRIVATE_KEY),
(LowLevel.CKA_ID, identifier)])[0]
except:
raise SmartCardConnectionError(
"Certificate has no valid private key")
# if you don't print priv_key you get a sign general error -.-
print(priv_key, file=open(devnull, "w")) # to avoid general error
return priv_key
[documenti] @staticmethod
def fetch_public_key(session, certificate):
"""
Return smart card public key reference
Params:
session: smart card session
certificate: certificate connected to the key
"""
log.info("fetching public key")
try:
# getting the certificate id
identifier = session.getAttributeValue(
certificate, [LowLevel.CKA_ID])[0]
# same as the key id
pub_key = session.findObjects([
(LowLevel.CKA_CLASS, LowLevel.CKO_PUBLIC_KEY),
(LowLevel.CKA_ID, identifier)])[0]
except:
raise SmartCardConnectionError(
"Certificate has no valid public key")
return pub_key
[documenti] @staticmethod
def digest(session, content):
"""
Return `content` hash
Params:
session: smart card session
content: content to hash
"""
log.info("hashing content")
try:
digest = session.digest(content, Mechanism(LowLevel.CKM_SHA256))
except:
raise SmartCardConnectionError("Failed on digest content")
return bytes(digest)
[documenti] @staticmethod
def signature(session, priv_key, content):
"""
Sign `content` with `privKey` reference
Reurn:
signature in bytearray
Params:
session: smart card session.
privKey: reference to the smart card private key.
content: bytes to hash and sign
"""
log.info("signing content")
try:
signature = session.sign(priv_key, content, Mechanism(
LowLevel.CKM_SHA256_RSA_PKCS, None))
except:
raise SmartCardConnectionError("Failed on sign content")
return bytes(signature)