Skip to content

Solution du problème CircularBuffer

Vous trouverez ici la solution pour l'exercice suivant :

CircularBuffer

Exercice de base

Cet exercice étant un peu long, entrons directement dans le vif du sujet.

Il va nous falloir une classe, avec un index pour l'écriture, un autre pour la lecture ainsi que le buffer.

Note

Ici nous utiliserons une liste, mais dans une optique de performance, il faudrait utiliser un tableau NumPy par exemple, avec un type de donnée fixe.

Donc commençons à écrire notre classe;

class CircularBuffer:
    def __init__(self, size):
        self.__size = size
        self.__buffer = [None] * size
        self.__read_idx = None
        self.__write_idx = 0

Remarquez les valeurs par défaut des index. L'index d'écriture est initialisé à 0 (normal), alors que l'index de lecture est lui initialisé a None (moins normal). En effet il nous faut un moyen de distinguer lorsque le buffer est plein de choses a lire et lorsqu'il est vide. En effet, dans chacun de ces 2 cas, l'index de lecture et l'index d'ecriture pointent sur la même case. Donc, lorsqu'il n'y aura rien a lire, l'index de lecture sera mis à None.

0 <= write_index < size
read_index = None
0 <= write_index < size
read_index = write_index

Maintenant que nous avons les attributs de notre classe, il va falloir écrire la fonction write.

Pour cela il faut s'assurer de comprendre comment fonctionne le buffer circulaire. Il s'agit d'un tableau de valeures comme n'importe quel buffer, mais au lieu de s'arreter lorsque le buffer est plein, on recommence au debut. Donc lorsque l'on doit écrire dans un buffer qui est plein, on écrase les plus vieilles données.

Cela est du au fait qu'un circular buffer est souvent utilisé en context temps réél, ou perdre quelques données n'est pas très grave temps qu'on s'assure de trouver une réponse adapté dans la plupart des cas, quite à en rater un ou deux

Bref, il va donc falloir écrire en augmentant le pointeur d'écriture, et le faire bloucler: lorsqu'on écrit dans la dérnière case, on recommence au début.

  def write(self, items: (int, list)):
    if isinstance(items, (tuple, list)):
      result = 0
      for i in items:
        result += self.write(i)
      return result
    else:
      self.__buffer[self.__write_idx] = items
      self.__write_idx = (self.__write_idx + 1) % self.__size
      return 1

Tout d'abord, il faut distinguer si items est une liste (ou un tuple) ou un entier. Si c'est une liste, alors on va appeler plusieurs fois write avec chacun des elements. Il est possible d'optimiser, mais ici ce n'est pas notre propos.

Nous utilisons l'oerateur modulo qui s'écrit %, cela donne le reste de la division Euclidienne. Et donc en augmentant l'index et en applicant le modulo, on arrive à ce que l'on veut: un index qui boucle.

Maintenant il va falloir écrire la fonction read. Si on y va de manière naive, voila ce que l'on peut écrire pour lire autant de données que demandées :

  def read(self, nb_items: int = 1):
    result = list()
    for i in range(nb_items):
        result.append(self.__buffer[self.__read_idx])
        self.__read_idx = (self.__read_idx + 1) % self.__size
    if len(result) == 1:
        return result[0]
    return tuple(result)

Arrivé a ce moment la, on peut commencer a lancer les test pour voir ce que cela donne. Malheureusement nous avons 2 tests qui échouent, parce que nous essayons de lire l'item a la position None dans notre buffer.

Effectiement, nous commençons par mettre l'attribut __read_idx à None dans le constructeur, mais nous ne le changeons jamais. Nous allons y remedier:

  def write(self, items: (int, list)):
    if self.__read_idx is None:
        self.__read_idx = self.__write_idx
    if isinstance(items, (tuple, list)):
      result = 0
      for i in items:
        result += self.write(i)
      return result
    else:
      self.__buffer[self.__write_idx] = items
      self.__write_idx = (self.__write_idx + 1) % self.__size
      return 1

Et voilà, mais n'oublions pas non plus de remettre l'index de lecture a None lorsque le buffer a été complétement lu:

def read(self, nb_items: int = 1):
    result = list()
    for i in range(nb_items):
        result.append(self.__buffer[self.__read_idx])
        self.__read_idx = (self.__read_idx + 1) % self.__size
        if self.__read_idx == self.__write_idx:
            self.__read_idx = None
            break
    if len(result) == 1:
        return result[0]
    return tuple(result)

Et donc voila le code pour l'exercice de base:

class CircularBuffer(object):
    def __init__(self, size: int):
        self.__size = size
        self.__buffer = [None] * size
        self.__read_idx = None
        self.__write_idx = 0

    def write(self, items: (int, list)):
        if self.__read_idx is None:
            self.__read_idx = self.__write_idx
        if isinstance(items, (tuple, list)):
          result = 0
          for i in items:
            result += self.write(i)
          return result
        else:
          self.__buffer[self.__write_idx] = items
          self.__write_idx = (self.__write_idx + 1) % self.__size
          return 1

    def read(self, nb_items: int = 1):
        result = list()
        for i in range(nb_items):
            result.append(self.__buffer[self.__read_idx])
            self.__read_idx = (self.__read_idx + 1) % self.__size
            if self.__read_idx == self.__write_idx:
                self.__read_idx = None
                break
        if len(result) == 1:
            return result[0]
        return tuple(result)

Bonus 1

Ajoutons les 3 methodes demandées; 2 d'entre elles sont assez simples compte tenu de notre définition du début :

  def is_empty(self):
      return self.__read_idx is None

  def is_full(self):
      return self.__write_idx == self.__read_idx
Et oui, lorsque rien n'est disponible pour la lecture, l'index de lecture vaut None; et lorsque le buffer est plein, alors on a les deux pointeurs egaux.

Pour le nombre d'éléménts disponible pour la lecture, cela est un peu plus comppliquer car il faut gerer le fait que l'on boucle. Mais pour faire simple, on va regarder si le pointeur de lecture est à droite du pointeur d'écriture, dans ce cas il suffira de rajouter la taille du buffer afin de le remettre à gauche (le pointeur de lecture doit toujours suivre le pointeur d'ecriture) :

def get_available_items(self):
    if self.is_empty():
        return 0
    if self.is_full():
        return self.__size
    if self.__read_idx > self.__write_idx:
        return self.__write_idx + self.__size - self.__read_idx
    return self.__write_idx - self.__read_idx

Bonus 1 bis

Nous allons maintenant nous plier à une (très) bonne pratique : donner une représentation textuelle de notre classe.

Pour cela, nous allons définir la dunder méehode __repr__

  def __repr__(self):
      return f"CircularBuffer(size={self.__size}, available={self.get_available_items()})"

Bonus 2

Il va falloir faire de notre buffer un iterable afin de pouvoir l'utiliser directement dans une boucle.

Afin de transformer notre buffer en iterable il va falloir lui ajouter 2 dunder fonctions. Premierement il faut qu'il ait une fonction __iter__ qui retourne son iterator, qui sera lui même. Ensuite, comme nous voulons en faire un iterator, il faut définir la fonction __next__ qui retourne un élément jusqu'à épuisement.

Voila ce que cela donne :

    def __iter__(self):
        return self

    def __next__(self):
        if self.is_empty():
            raise StopIteration()
        return self.read()

Grace à la magie de python, cela suffit a ce qu'on puisse utiliser notre buffer dans une boucle, c'est trop cool.

Bonus 3

Maintenant nous allons devoir gérer les dépassements, tnat en lecture qu'en ecriture... Et c'est par la lecture que nous commençons. Si on lit alors que le buffer est empty, alors il faudra soulever une exception:

  def read(self, nb_items: int = 1):
    result = list()
    for i in range(nb_items):
        if self.is_empty():
            raise IndexError("No more item available for read in buffer")
        result.append(self.__buffer[self.__read_idx])
        self.__read_idx = (self.__read_idx + 1) % self.__size
        if self.__read_idx == self.__write_idx:
            self.__read_idx = None
            break
    if len(result) == 1:
        return result[0]
    return tuple(result)

Maintenant, il faut aussi gerer lorsque le buffer est plein et que l'on ecrit de nouveau. Il va devoir deplacer la tête de lecture dans ce cas la, car les anciennes valeures seront alors perdues :

  def write(self, items: (int, list)):
    if isinstance(items, (tuple, list)):
      result = 0
      for i in items:
        result += self.write(i)
      return result
    else:
      if self.is_full():
          self.__read_idx = (self.__read_idx + 1) % self.__size
      if self.__read_idx is None:
          self.__read_idx = self.__write_idx
      self.__buffer[self.__write_idx] = items
      self.__write_idx = (self.__write_idx + 1) % self.__size
      return 1

Et voila qui clos cet exercice assez long je l'avoue; mais sans réelle difficulté.

Voici donc l'integralité de ma solution:

class CircularBuffer(object):
    def __init__(self, size: int):
        self.__size = size
        self.__buffer = [None] * size
        self.__read_idx = None
        self.__write_idx = 0

    def write(self, items: (int, list)):
        if isinstance(items, (tuple, list)):
            result = 0
            for i in items:
                result += self.write(i)
            return result
        else:
            if self.is_full():
                self.__read_idx = (self.__read_idx + 1) % self.__size
            if self.__read_idx is None:
                self.__read_idx = self.__write_idx
            self.__buffer[self.__write_idx] = items
            self.__write_idx = (self.__write_idx + 1) % self.__size
            return 1

    def read(self, nb_items: int = 1):
        result = list()
        for i in range(nb_items):
            if self.is_empty():
                raise IndexError("No more item available for read in buffer")
            result.append(self.__buffer[self.__read_idx])
            self.__read_idx = (self.__read_idx + 1) % self.__size
            if self.__read_idx == self.__write_idx:
                self.__read_idx = None
                break
        if len(result) == 1:
            return result[0]
        return tuple(result)

    def get_available_items(self):
        if self.is_empty():
            return 0
        if self.is_full():
            return self.__size
        if self.__read_idx > self.__write_idx:
            return self.__write_idx + self.__size - self.__read_idx
        return self.__write_idx - self.__read_idx

    def is_empty(self):
        return self.__read_idx is None

    def is_full(self):
        return self.__write_idx == self.__read_idx

    def __repr__(self):
        return f"CircularBuffer(size={self.__size}, available={self.get_available_items()})"

    def __iter__(self):
        return self

    def __next__(self):
        if self.is_empty():
            raise StopIteration()
        return self.read()