Après la présentation des concepts, nous allons nous attaquer au module « threading » de Python… Le module permet d’utiliser le multithreading préemptif de manière assez simple. Nous nous concentrerons sur l’API disponible depuis la version 3.10 de Python.
GIL
On ne peut pas parler du multithreading en Python sans parler du GIL (Global Interpreter Lock). La gestion du multithreading a été ajoutée en 1992 par Guido Van Rossum. Il a alors intégré un verrou interne à l’interpréteur qui est, par la suite, devenu un frein historique à l’utilisation des ressources des machines multi-cœurs. Au commencement, le GIL était un atout car il permettait aux développeurs de ne pas trop se soucier de la gestion des synchronisations lors de l’écriture des extensions en C. L’interpréteur se sert du GIL pour protéger tous les objets des accès concurrentiels notamment pour protéger le compteur de références et donc la gestion de la mémoire. Et donc, du fait de la présence de ce verrou, Python n’utilise pas correctement les cœurs des processeurs actuels du fait d’un excès de synchronisation ☹.
De nombreuses tentatives pour retirer le GIL ont été faites par le passé et ont échoué. Récemment, Guido Van Rossum a rejoint Microsoft avec pour objectif fixé d’améliorer les performances de Python. La version 3.11 de Python est une première version intégrant de substantielles améliorations.
EXEMPLE DE CRÉATION DE THREADS
Pour faire du multithreading, la première étape consiste à savoir créer des threads. L’objectif est d’exécuter des acteurs qui peuvent être des fonctions ou des objets.
Acteur = fonction
Le plus simple est de commencer par un exemple simple : 4 threads écrivant 3000 fois des textes différents (des caractères allant de 0 à 3) dans la console.
from threading import Thread # 1) la function à exécuter def actor( text, count ): for i in range( count ): print( text, end="" ) if __name__ == "__main__": # 2) initialisation actors = [ Thread( target = actor, args = ( text, 3000 ) ) for text in "0123" ] # 3) lancement des threads for thread in actors: thread.start() # 4) attente que tous les threads soient finis for thread in actors: thread.join() # finalize print( "nLes threads sont terminésn" )
Il y a donc 4 étapes :
- Créer la fonction « actor » qui devra être exécutée par les threads ; en l’occurrence nous allons afficher « n » fois un texte dans la console
- Initialiser les threads : définir quelle fonction devra être lancée avec quels paramètres. A ce stade le thread n’est pas créé et la fonction « actor » n’est pas exécutée.
- Démarrer les threads : on les ajoute dans l’ordonnanceur et les fonctions « actor » sont exécutées
- Attendre la fin de tous les threads
La classe « Thread » est l’objet en Python qui permet de créer et gérer les threads. Nous expliquerons dans le détail les API de la classe dans un autre paragraphe. Le défaut de cette technique est que si la fonction « actor » retournait un résultat important pour la suite du programme, on n’y aurait pas accès.
Acteur = objet
« Thread » est une classe que l’on peut spécialiser, cela permet de profiter de la capacité des classes de stocker des données comme variables membres. Ce faisant, on pourra, à la fin de l’exécution, récupérer le résultat pour l’exploiter.
from threading import Thread class Actor( Thread ): def __init__( self, text, count ): Thread.__init__( self ) self.text = text self.count = count self.size = 0 def run( self ): for i in range( self.count ): print( self.text, end = "" ) self.size = len( self.text ) * self.count if __name__ == "__main__": # initialisation actors = [ Actor( text, 3000 ) for text in "0123" ] # lancement des threads for thread in actors: thread.start() # attente que tous les threads aient terminés for thread in actors: thread.join() # affichage des résultats print() for thread in actors: print( F"ID({thread.name}) Text('{thread.text}') Size({thread.size})" ) # finalize print( "nLes threads sont terminésn" )
Il y a donc 4 étapes :
- Créer la classe « Actor » qui devra être utilisée comme threads en surchargeant la fonction « run » qui sera l’action exécutée par le thread; en l’occurrence nous allons afficher « n » fois un texte dans la console
- Initialiser les threads : définir quelle instance de classe sera lancée avec quels paramètres. A ce stade le thread n’est pas créé et la fonction « Actor.run » n’est pas exécutée.
- Démarrer les threads : on les ajoutent dans l’ordonnanceur et les fonctions « actor » sont exécutées
- Attendre la fin de tous les threads
API DE LA CLASSE THREAD
Maintenant que l’on a vu, par des exemples, comment créer des threads selon les deux options possibles, on sait que la classe Thread est essentielle et que maîtriser son API est nécessaire.
Constructeur
Utile pour utiliser un acteur sous forme de fonction, l’interface du constructeur est :
class threading.Thread( group = None, target = None, name = None, args = (), kwargs = {}, *, Daemon = None )
Pour démarrer un thread en mode fonctionnel, il nous faut passer les arguments suivants :
- « target » : on lui passe la fonction à exécuter
- « args » : on y stocke le tuple contenant les arguments positionnels à passer à « target »
- « kwargs » : on y stocke le dicitionnaire des paramètres nommés à passer à « target »
« name » permet de nommer le thread pour le reconnaitre. Si on le laisse à None, par défaut l’API génèrera un nom du type « Thread-N » avec N entier.
« group » n’a pour l’instant pas d’utilité, il est réservé pour un usage futur.
« Daemon » permet de définir si ce thread sera un démon ou non. Cette valeur doit être définie avant l’appel à la fonction « start ». L’interpréteur Python s’arrête dès que tous les threads non-démons sont terminés.
start()
Cette function démarre le thread été exécute l’action. Elle ne peut être appelée qu’une fois par objet Thread, une exception RuntimeError sera levée en cas de second appel..
join( timeout = None )
Attend jusqu’à ce que le thread soit terminé (si timeout est None). Pour définir timeout, il faut fournir une valeur flottante correspondant au temps maximum en seconde à attendre. La fonction « join » ne retournant pas de valeur, il faudra utiliser la fonction « is_alive ».
run()
C’est la fonction qui est appelée par « start ». Si on veut créer notre classe de Thread spécialisée, il faut surcharger cette fonction.
name
Propriété permettant de lire/écrire le nom du thread.
daemon
Propriété qui permet de savoir si ce thread est un démon ou non.
is_alive()
Retourne True si le thread est en vie (continue de s’exécuter).
native_id
Identifiant du thread affecté par l’OS.
LES PRIMITIVES DE SYNCHRONISATION
Les verrous Lock et RLock sont les plus usités mais ce n’est pas une raison de négliger les autres qui ont leur utilité.
Lock
C’est la classe Python correspondant au concept de verrou (mutex). L’implémentation interne utilise le verrou concret le plus efficace fourni sur la plateforme. L’interface est simple :
- acquire( blocking = True, timeout = -1 )
attend jusqu’à acquérir le mutex si blocking est True. Si blocking est False, il n’y d’attente que jusqu’à ce que le timeout (valeur flottante en seconde) expire. La fonction retourne True si le verrou a été pris, False sinon. - release()
libère le verrou - locked()
retourne True si le verrou est pris.
Voici un exemple d’utilisation du mutex pour garantir la libération du verrou :
from threading import Lock m = Lock() m.acquire() try: action() finally: m.release()
Grâce à la gestion de contexte, on dispose d’une méthode plus simple :
from threading import Lock … m = Lock() … with m: action()
RLock
C’est la classe Python correspondant au concept de verrou récursif ou réentrant (recursive mutex). L’implémentation interne utilise le verrou récursif concret le plus efficace fourni sur la plateforme. L’interface est la même que pour la classe Lock.
Timer
Timer est une sous-classe de la classe Thread. Elle déclenche un appel de fonction au bout d’un temps déterminé à l’avance.
L’interface est simple :
- Timer(interval, function, args=None, kwargs=None)
Appellera “function(*args, **kwargs)” après “interval” seconds. C’est une initialization, le compte à rebours n’est pas lancé. - start()
démarre le timer/compte à rebours - cancel()
stoppe le timer durant la période d’attente - function
variable membre contenant la fonction à appeler après l’intervalle - args, kwargs
variables membres contenant les paramètres à passer à la fonction après l’intervalle - interval
variable contenant le temps à attendre - finished
variable membre contenant un objet de type threading.Event que l’on attend pour déclencher l’appel de la fonction
Semaphore
Le concept du sémaphore de Dijkstra pour Python. Les noms des fonctions ont été homogénéisées pour faciliter l’apprentissage. L’interface est la suivante :
- Semaphore( value = 1 )
constructeur qui prend la valeur initiale du compteur de ressources disponibles - acquire( blocking = True, timeout = None )
attend jusqu’à acquérir une ressource du sémaphore si blocking est True. Si blocking est False, il n’y d’attente que jusqu’à ce que le timeout (valeur flottante en seconde) expire. La fonction retourne True si la ressource a été capturée, False sinon. - release( n = 1 )
libère n ressources du sémaphore
EXEMPLE DES PRODUCTEURS/CONSOMMATEURS
Nous avons un groupe de producteurs qui envoient des messages et nous avons un groupe de consommateurs qui vont dépiler les messages pour les traiter. Afin de synchroniser les producteurs et les consommateurs nous utiliserons une queue. Ce type de traitement est effectué dans des applications (notamment bancaires) avec des queues MQSeries ou Kafka.
Nos producteurs vont écrire un nombre (prévu à l’avance) de messages et s’arrêter. Par contre les consommateurs vont devoir tourner jusqu’à ce que nous leur disions de s’arrêter. Nous allons donc convenir d’un message de stop (une chaîne de caractères contenant le mot « STOP » par exemple) mis dans la queue qui provoquera l’arrêt du consommateur qui l’aura récupéré : il faudra donc poster un message par consommateur…
Queue
L’idée est de construire une queue FIFO (First In First Out) protégée contre les accès concurrents (ThreadSafe). Et nous allons le faire avec les outils basiques même si Python propose des outils prêt à l’emploi. Il nous faudra une liste et les messages seront insérés en tête (avec « l.insert( 0, msg ) ») et récupérer en queue (avec « l.pop() »).
class ThreadSafeQueue: def __init__( self ): self._lock = Lock() self._queue = [] def send( self, msg ): with self._lock: self._queue.insert( 0, msg ) def get_msg( self ): with self._lock: try: msg = self._queue.pop() except: msg = None return msg
Dans le constructeur, on crée deux variables membres « _lock » pour le verrou qui rendra l’objet thread safe et « _queue » pour contenir la liste de messages.
La fonction « send( msg ) » servira au producteurs pour envoyer les messages aux consommateurs.
La fonction « get_msg() » dépile un message et le retourne. Si aucun message n’est disponible, elle retournera None.
Gestion des I/O
Les consommateurs écriront les messages dans un fichier afin de pouvoir vérifier que le programme se comporte correctement et ne perd aucun message. Les entrées/sorties sont toujours un point problématique : les fichiers ou assimilés-fichiers sont rarement ThreadSafe. Là aussi nous devrons utiliser un verrou.
class Output: def __init__( self ): self._lock = Lock() self._file = open( "C:\Temp\output.txt", "w" ) def write( self, text ): with self._lock: print( text, file = self._file, flush = True )
Initialisation des structures communes
Il faut créer les données comme variables membres :
queue = ThreadSafeQueue() output = Output()
Le producteur
Nous gérerons les producteurs et consommateurs comme de simples fonctions. Le producteur aura une interface à deux paramètres : un identifiant et le nombre de messages à créer… Les messages seront des tuples de 3 valeurs : l’identifiant du producteur, un mot (MSG) et le numéro du message.
def producer( id, count ): for i in range( count ): queue.send( ( id, "MSG", i ) )
Le consommateur
Un consommateur consomme les messages de la queue jusqu’à obtenir un message de stop. Chaque message sera écrit dans le fichier de sortie (« output »). On a donc une boucle qui ne s’arrêtera que lorsqu’un message « STOP » arrivera.
def consumer( id ): do_loop = True while do_loop: msg = queue.get_msg() if msg: output.write( F"Consumer {id} received {msg}" ) do_loop = ( msg != "STOP" )
Lancement des threads
Un consommateur consomme :
# initialisation consumers = [ Thread( target = consumer, args = ( 1, ) ), Thread( target = consumer, args = ( 2, ) ) ] producers = [ Thread( target = producer, args = ( 1, 5000 ) ), Thread( target = producer, args = ( 2, 4000 ) ) ] # lancement des threads for thread in [ *producers, *consumers ]: thread.start() # attente de la fin des producteurs for thread in producers: thread.join() output.write( "nProducers finished!" ) # envoi des messages STOP aux consommateurs for thread in consumers: queue.send( "STOP" ) output.write( "nSTOP was sent to consumers!" ) # attente de la fin des consommateurs for thread in consumers: thread.join() output.write( "nIt’s finished!" )
EXEMPLE DE TIMER A REPETITION
L’objectif va être de créer un mécanisme capable d’appeler de façon répétée toutes les x secondes une fonction : pour mettre à jour une horloge par exemple. Pour obtenir ce résultat nous allons créer une classe qui héritera de la classe Timer.
RepeatTimer
Un timer à répétition, voilà la solution à notre problème. Il suffit de surcharger la fonction « run » de Timer en y ajoutant une boucle while :
from threading import Timer class RepeatTimer( Timer ): def run( self ): while not self.finished.wait( self.interval ): self.function( *self.args, **self.kwargs )
Affichage de l’horloge
L’action que nous souhaitons appeler périodiquement est l’affichage de l’horloge. Nous allons faire simple :
from time import sleep, localtime, strftime def print_clock(): print( strftime( "%H:%M:%S", localtime() ) )
Lancement du timer
Lancer le timer est facile :
# lancement timer = RepeatTimer( 0.5, print_clock ) timer.start()
Pouvoir l’arrêter lorsque l’on en a plus besoin peut être utile aussi :
# attente avant arrêt du timer sleep( 5 ) timer.cancel()
CONCLUSION
Nous vous avons présenté les principales API du module threading avec des exemples concrets de mise en œuvre vous permettant de mieux comprendre leur fonctionnement, mais sans couvrir toutes les possibilités (Event, Condition, etc…). J’espère vous avoir ouvert donner des idées pour vos prochaines réalisations.