Solution du problème CircularBuffer¶
Vous trouverez ici la solution pour l'exercice suivant :
Exercice de base¶
Cet exercice étant un peu long, entrons directement dans le vif du sujet.
Il va nous falloir une classe, avec un index pour l'écriture, un autre pour la lecture ainsi que le buffer.
Note
Ici nous utiliserons une liste, mais dans une optique de performance, il faudrait utiliser un tableau NumPy par exemple, avec un type de donnée fixe.
Donc commençons à écrire notre classe;
class CircularBuffer:
def __init__(self, size):
self.__size = size
self.__buffer = [None] * size
self.__read_idx = None
self.__write_idx = 0
Remarquez les valeurs par défaut des index. L'index d'écriture est initialisé à 0 (normal), alors que l'index de lecture est lui initialisé a None (moins normal). En effet il nous faut un moyen de distinguer lorsque le buffer est plein de choses a lire et lorsqu'il est vide.
En effet, dans chacun de ces 2 cas, l'index de lecture et l'index d'ecriture pointent sur la même case. Donc, lorsqu'il n'y aura rien a lire, l'index de lecture sera mis à None.
0 <= write_index < size
read_index = None
0 <= write_index < size
read_index = write_index
Maintenant que nous avons les attributs de notre classe, il va falloir écrire la fonction write.
Pour cela il faut s'assurer de comprendre comment fonctionne le buffer circulaire. Il s'agit d'un tableau de valeures comme n'importe quel buffer, mais au lieu de s'arreter lorsque le buffer est plein, on recommence au debut. Donc lorsque l'on doit écrire dans un buffer qui est plein, on écrase les plus vieilles données.
Cela est du au fait qu'un circular buffer est souvent utilisé en context temps réél, ou perdre quelques données n'est pas très grave temps qu'on s'assure de trouver une réponse adapté dans la plupart des cas, quite à en rater un ou deux
Bref, il va donc falloir écrire en augmentant le pointeur d'écriture, et le faire bloucler: lorsqu'on écrit dans la dérnière case, on recommence au début.
def write(self, items: (int, list)):
if isinstance(items, (tuple, list)):
result = 0
for i in items:
result += self.write(i)
return result
else:
self.__buffer[self.__write_idx] = items
self.__write_idx = (self.__write_idx + 1) % self.__size
return 1
Tout d'abord, il faut distinguer si items est une liste (ou un tuple) ou un entier. Si c'est une liste, alors on va appeler plusieurs fois write avec chacun des elements. Il est possible d'optimiser, mais ici ce n'est pas notre propos.
Nous utilisons l'oerateur modulo qui s'écrit %, cela donne le reste de la division Euclidienne. Et donc en augmentant l'index et en applicant le modulo, on arrive à ce que l'on veut: un index qui boucle.
Maintenant il va falloir écrire la fonction read.
Si on y va de manière naive, voila ce que l'on peut écrire pour lire autant de données que demandées :
def read(self, nb_items: int = 1):
result = list()
for i in range(nb_items):
result.append(self.__buffer[self.__read_idx])
self.__read_idx = (self.__read_idx + 1) % self.__size
if len(result) == 1:
return result[0]
return tuple(result)
Arrivé a ce moment la, on peut commencer a lancer les test pour voir ce que cela donne. Malheureusement nous avons 2 tests qui échouent, parce que nous essayons de lire l'item a la position None dans notre buffer.
Effectiement, nous commençons par mettre l'attribut __read_idx à None dans le constructeur, mais nous ne le changeons jamais.
Nous allons y remedier:
def write(self, items: (int, list)):
if self.__read_idx is None:
self.__read_idx = self.__write_idx
if isinstance(items, (tuple, list)):
result = 0
for i in items:
result += self.write(i)
return result
else:
self.__buffer[self.__write_idx] = items
self.__write_idx = (self.__write_idx + 1) % self.__size
return 1
Et voilà, mais n'oublions pas non plus de remettre l'index de lecture a None lorsque le buffer a été complétement lu:
def read(self, nb_items: int = 1):
result = list()
for i in range(nb_items):
result.append(self.__buffer[self.__read_idx])
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx == self.__write_idx:
self.__read_idx = None
break
if len(result) == 1:
return result[0]
return tuple(result)
Et donc voila le code pour l'exercice de base:
class CircularBuffer(object):
def __init__(self, size: int):
self.__size = size
self.__buffer = [None] * size
self.__read_idx = None
self.__write_idx = 0
def write(self, items: (int, list)):
if self.__read_idx is None:
self.__read_idx = self.__write_idx
if isinstance(items, (tuple, list)):
result = 0
for i in items:
result += self.write(i)
return result
else:
self.__buffer[self.__write_idx] = items
self.__write_idx = (self.__write_idx + 1) % self.__size
return 1
def read(self, nb_items: int = 1):
result = list()
for i in range(nb_items):
result.append(self.__buffer[self.__read_idx])
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx == self.__write_idx:
self.__read_idx = None
break
if len(result) == 1:
return result[0]
return tuple(result)
Bonus 1¶
Ajoutons les 3 methodes demandées; 2 d'entre elles sont assez simples compte tenu de notre définition du début :
def is_empty(self):
return self.__read_idx is None
def is_full(self):
return self.__write_idx == self.__read_idx
None; et lorsque le buffer est plein, alors on a les deux pointeurs egaux.
Pour le nombre d'éléménts disponible pour la lecture, cela est un peu plus comppliquer car il faut gerer le fait que l'on boucle. Mais pour faire simple, on va regarder si le pointeur de lecture est à droite du pointeur d'écriture, dans ce cas il suffira de rajouter la taille du buffer afin de le remettre à gauche (le pointeur de lecture doit toujours suivre le pointeur d'ecriture) :
def get_available_items(self):
if self.is_empty():
return 0
if self.is_full():
return self.__size
if self.__read_idx > self.__write_idx:
return self.__write_idx + self.__size - self.__read_idx
return self.__write_idx - self.__read_idx
Bonus 1 bis¶
Nous allons maintenant nous plier à une (très) bonne pratique : donner une représentation textuelle de notre classe.
Pour cela, nous allons définir la dunder méehode __repr__
def __repr__(self):
return f"CircularBuffer(size={self.__size}, available={self.get_available_items()})"
Bonus 2¶
Il va falloir faire de notre buffer un iterable afin de pouvoir l'utiliser directement dans une boucle.
Afin de transformer notre buffer en iterable il va falloir lui ajouter 2 dunder fonctions.
Premierement il faut qu'il ait une fonction __iter__ qui retourne son iterator, qui sera lui même.
Ensuite, comme nous voulons en faire un iterator, il faut définir la fonction __next__ qui retourne un élément jusqu'à épuisement.
Voila ce que cela donne :
def __iter__(self):
return self
def __next__(self):
if self.is_empty():
raise StopIteration()
return self.read()
Grace à la magie de python, cela suffit a ce qu'on puisse utiliser notre buffer dans une boucle, c'est trop cool.
Bonus 3¶
Maintenant nous allons devoir gérer les dépassements, tnat en lecture qu'en ecriture...
Et c'est par la lecture que nous commençons.
Si on lit alors que le buffer est empty, alors il faudra soulever une exception:
def read(self, nb_items: int = 1):
result = list()
for i in range(nb_items):
if self.is_empty():
raise IndexError("No more item available for read in buffer")
result.append(self.__buffer[self.__read_idx])
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx == self.__write_idx:
self.__read_idx = None
break
if len(result) == 1:
return result[0]
return tuple(result)
Maintenant, il faut aussi gerer lorsque le buffer est plein et que l'on ecrit de nouveau. Il va devoir deplacer la tête de lecture dans ce cas la, car les anciennes valeures seront alors perdues :
def write(self, items: (int, list)):
if isinstance(items, (tuple, list)):
result = 0
for i in items:
result += self.write(i)
return result
else:
if self.is_full():
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx is None:
self.__read_idx = self.__write_idx
self.__buffer[self.__write_idx] = items
self.__write_idx = (self.__write_idx + 1) % self.__size
return 1
Et voila qui clos cet exercice assez long je l'avoue; mais sans réelle difficulté.
Voici donc l'integralité de ma solution:
class CircularBuffer(object):
def __init__(self, size: int):
self.__size = size
self.__buffer = [None] * size
self.__read_idx = None
self.__write_idx = 0
def write(self, items: (int, list)):
if isinstance(items, (tuple, list)):
result = 0
for i in items:
result += self.write(i)
return result
else:
if self.is_full():
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx is None:
self.__read_idx = self.__write_idx
self.__buffer[self.__write_idx] = items
self.__write_idx = (self.__write_idx + 1) % self.__size
return 1
def read(self, nb_items: int = 1):
result = list()
for i in range(nb_items):
if self.is_empty():
raise IndexError("No more item available for read in buffer")
result.append(self.__buffer[self.__read_idx])
self.__read_idx = (self.__read_idx + 1) % self.__size
if self.__read_idx == self.__write_idx:
self.__read_idx = None
break
if len(result) == 1:
return result[0]
return tuple(result)
def get_available_items(self):
if self.is_empty():
return 0
if self.is_full():
return self.__size
if self.__read_idx > self.__write_idx:
return self.__write_idx + self.__size - self.__read_idx
return self.__write_idx - self.__read_idx
def is_empty(self):
return self.__read_idx is None
def is_full(self):
return self.__write_idx == self.__read_idx
def __repr__(self):
return f"CircularBuffer(size={self.__size}, available={self.get_available_items()})"
def __iter__(self):
return self
def __next__(self):
if self.is_empty():
raise StopIteration()
return self.read()