Big / Bug Data: Analyse du code source Apache Flink

image1.png


Les applications Big Data traitent d'énormes quantités d'informations, souvent en temps réel. Naturellement, ces applications doivent être hautement fiables afin qu'aucune erreur dans le code ne puisse interférer avec le traitement des données. Pour atteindre une fiabilité élevée, il est nécessaire de suivre de près la qualité du code de projets développé pour ce domaine. L'analyseur statique PVS-Studio traite ce problème. Aujourd'hui, le projet Apache Flink développé par l'Apache Software Foundation, l'un des leaders du marché des logiciels Big Data, a été choisi comme sujet de test pour l'analyseur.



Qu'est-ce qu'Apache Flink? Il s'agit d'un framework open-source pour le traitement distribué de grandes quantités de données. Il a été développé comme alternative à Hadoop MapReduce en 2010 à l'Université technique de Berlin. Le cadre est basé sur un moteur d'exécution distribué pour les applications de traitement de données par lots et par flux. Ce moteur est écrit en langages Java et Scala. Aujourd'hui, Apache Flink peut être utilisé dans des projets écrits en Java, Scala, Python et même SQL.



Analyse de projet



Après avoir téléchargé le code source du projet, j'ai commencé à construire le projet avec la commande 'mvn clean package -DskipTests' spécifiée dans les instructions sur GitHub . Pendant que l'assemblage était en cours, à l'aide de l'utilitaire CLOC , j'ai découvert qu'il y avait 10838 fichiers Java dans le projet, qui contiennent environ 1,3 million de lignes de code. De plus, il y avait 3833 fichiers de test Java, soit plus du tiers de tous les fichiers Java. J'ai également remarqué que le projet utilise l'analyseur de code statique FindBugs et l'utilitaire Cobertura, qui fournit des informations sur la couverture du code par les tests. Avec tout cela à l'esprit, il devient clair que les développeurs d'Apache Flink ont ​​fait attention à la qualité du code et à la couverture des tests pendant le développement.



Après une compilation réussie, j'ai ouvert le projet dans IntelliJ IDEA et exécuté l'analyse à l'aide du plugin PVS-Studio for IDEA et Android Studio . Les avertissements de l'analyseur ont été distribués comme suit:



  • 183 haut;
  • 759 moyen;
  • 545 Faible.


Environ 2/3 des déclencheurs de l'analyseur PVS-Studio ont été affectés à des fichiers de test. Compte tenu de ce fait et de la taille de la base de code du projet, nous pouvons dire que les développeurs Apache Flink ont ​​réussi à garder la qualité du code à leur meilleur.



Après avoir étudié plus en détail les avertissements de l'analyseur, j'ai choisi les plus intéressants à mon avis. Voyons donc ce que PVS-Studio a réussi à trouver dans ce projet!





Juste un peu d'insouciance



V6001 Il existe des sous-expressions identiques «traitéesDonnées» à gauche et à droite de l'opérateur «==». CheckpointStatistics.java (229)



@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}
      
      





Sur fond d'autres expressions en retour, cette erreur n'est pas très frappante. Lors de la substitution de la méthode equals pour la classe CheckpointStatistics , le programmeur a commis une erreur dans l'expression traitéeData == traitéeData , ce qui n'a aucun sens car elle est toujours vraie. De même, le reste de l'expression en retour devait être comparé champ de l'objet courant this et object That : processorData == that.processedData... Cette situation est l'un des modèles d'erreurs typiques des fonctions de comparaison, qui sont décrits en détail dans l'article «Le mal vit dans les fonctions de comparaison ». Et il s'avère que juste "une petite inattention" a cassé la logique de vérification de l'équivalence des objets de la classe CheckpointStatistics .



L'expression est toujours vraie



V6007 L' expression 'input2.length> 0' est toujours vraie. Opérateur.java (283)



public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}
      
      





Dans cette méthode, l'analyseur s'est avéré plus attentif qu'une personne, qu'il a décidé de rapporter à sa manière, indiquant que l'expression input2.length> 0 sera toujours vraie. La raison est que si la longueur du tableau input2 est 0, alors la condition input2 == null || input2.length == 0 du premier if dans la méthode sera vrai, et l'exécution de la méthode sera interrompue avant d'atteindre la ligne avec l'expression input2.length> 0 .



Analyseur qui voit tout



L' expression V6007 'slotSharingGroup == null' est toujours fausse. StreamGraphGenerator.java (510)



private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}
      
      





L'analyseur a signalé que slotSharingGroup == null est toujours faux. Cela suggère que la méthode determineSlotSharingGroup ne retournera jamais null . L'analyseur est-il si intelligent qu'il a pu calculer toutes les valeurs que cette méthode peut renvoyer? Mieux vaut tout vérifier nous-mêmes:



public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}
      
      





Dans l'ordre, nous parcourons tous les retours et voyons ce qui peut retrouver cette méthode:



  • Le premier retour renverra l'argument à la méthode specifiedGroup , mais uniquement s'il n'est pas nul .
  • return for DEFAULT_SLOT_SHARING_GROUP, ;
  • return inputGroup, null. DEFAULT_SLOT_SHARING_GROUP.


Il s'avère que l'analyseur était vraiment capable de calculer l'impossibilité de renvoyer null à partir de la méthode determinerSlotSharingGroup et nous en a avertis, soulignant l' inutilité de vérifier slotSharingGroup == null . Et bien que cette situation ne soit pas erronée, une telle protection supplémentaire de l'analyseur pourra détecter une erreur dans un autre cas. Par exemple, lorsque vous avez besoin d'une méthode pour renvoyer null dans certaines conditions.



Collectez les tous



L' expression V6007 'currentCount <= lastEnd' est toujours vraie. CountSlidingWindowAssigner.java (75)



V6007 L' expression «lastStart <= currentCount» est toujours vraie. CountSlidingWindowAssigner.java (75)



@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}
      
      





L'analyseur avertit que les expressions currentCount <= lastEnd et lastStart <= currentCount sont toujours vraies. En effet, si vous regardez la condition de la boucle while , alors il y a exactement les mêmes expressions. Cela signifie qu'à l'intérieur de la boucle, ces expressions seront toujours vraies, donc tous les objets de type CountWindow créés dans la boucle seront ajoutés à la liste des fenêtres . Il existe de nombreuses options pour l'apparence de cette vérification dénuée de sens, et la première chose qui vient à l'esprit est soit un artefact de refactoring, soit un filet de sécurité du développeur. Mais cela peut être une erreur, si vous vouliez vérifier autre chose ...



Ordre des arguments incorrect



V6029 Ordre incorrect possible des arguments passés à la méthode: 'hasBufferForReleasedChannel', 'hasBufferForRemovedChannel'. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)



private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}
      
      





Le manque de capacité de Java à appeler une méthode avec des paramètres nommés joue parfois une blague cruelle avec les développeurs. C'est exactement ce qui s'est passé lorsque l' analyseur a pointé vers la méthode createMessageList . En regardant la définition de cette méthode, il devient clair que le paramètre hasBufferForRemovedChannel doit être passé à la méthode avant le paramètre hasBufferForReleasedChannel :



private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}
      
      





Cependant, lors de l'appel de la méthode, le développeur a mélangé l'ordre de ces arguments, c'est pourquoi la logique de la méthode createMessageList sera interrompue si les valeurs des arguments mixtes diffèrent.



Oh, ce copier-coller



V6032 Il est étrange que le corps de la méthode «seekToFirst» soit totalement équivalent au corps d'une autre méthode «seekToLast». RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)



public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}
      
      





Les corps des méthodes seekToFirst et seekToLast sont identiques. De plus, les deux méthodes sont utilisées dans le code.



Il y a quelque chose d'impur ici! En effet, si vous regardez les méthodes de l'objet itérateur , il deviendra clair quelle erreur l'analyseur a aidé à trouver:



public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}
      
      





Il s'avère que la méthode seekToLast class RocksIteratorWrapper a été créée par copier-coller la méthode seekToFirst de la même classe. Cependant, pour une raison quelconque, le développeur a oublié de remplacer le iterator de » seekToFirst méthode appel avec seekToLast .



Confusion avec les chaînes de format



V6046 Format incorrect. Un nombre différent d'éléments de format est attendu. Arguments non utilisés: 1. UnsignedTypeConversionITCase.java (102)



public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}
      
      





Les chaînes de format de la méthode String.format et des enregistreurs Java sont différentes. Contrairement à la chaîne de format de la méthode String.format , où les substitutions d'arguments sont spécifiées à l'aide du caractère «%», les chaînes de format de journalisation utilisent à la place la combinaison de caractères «{}». En raison de cette confusion, cette erreur s'est produite. En tant que chaîne de format, une chaîne est transmise à la méthode String.format , qui a probablement été copiée à partir d'un autre endroit où elle a été utilisée dans un enregistreur. Par conséquent, la valeur de champ INITIALIZE_DB_MAX_RETRY ne sera pas substituée dans le message IllegalStateException . au lieu de '{}', et la personne qui détecte ou enregistre cette exception ne saura jamais combien de tentatives de connexion à la base de données ont été faites.



Distribution anormale



V6048 Cette expression peut être simplifiée. Opérande 'index' dans l'opération vaut 0. CollectionUtil.java (76)



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}
      
      





La méthode de partition divise les éléments de la collection d' éléments en plusieurs segments, puis renvoie ces segments. Cependant, en raison de l'erreur signalée par l'analyseur, aucune séparation ne se produira. L'expression utilisée pour déterminer l' index du numéro de segment % numBuckets sera toujours 0, car index est toujours 0. Au départ, je pensais que le code de cette méthode avait été refactorisé, ce qui leur a fait oublier d'ajouter un incrément de la variable d' index dans la boucle for . Mais en regardant le commitlà où cette méthode a été ajoutée, il s'est avéré que cette erreur est venue avec cette méthode. Version corrigée du code:



public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}
      
      





Type incompatible



V6066 Le type d'objet passé en argument est incompatible avec le type de collection: String, ListStateDescriptor <NextTransactionalIdHint>. FlinkKafkaProducer.java (1083)



public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}
      
      





L'expression pointée par l'analyseur sera toujours fausse, ce qui signifie que l'appel à la méthode migrateNextTransactionalIdHindState ne se produira jamais. Comment se fait-il qu'une personne recherche une collection de type Set <String> pour un élément d'un type complètement différent - ListStateDescriptor <FlinkKafkaProducer.NextTransactionalIdHint> ? Sans l'aide de l'analyseur, une telle erreur, très probablement, aurait vécu dans le code pendant très longtemps, car elle ne frappe pas l'œil et il est tout simplement impossible de la trouver sans une vérification approfondie de cette méthode.



Changement de variable non atomique



V6074 Modification non atomique de la variable volatile. Inspectez 'currentNumAcknowledgedSubtasks'. En attenteCheckpointStats.java (131)



boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}
      
      





Plus 3 autres avertissements de l'analyseur dans la même méthode:



  • V6074 Modification non atomique de la variable volatile. Inspectez «currentStateSize». En attenteCheckpointStats.java (134)
  • V6074 Modification non atomique de la variable volatile. Inspectez 'currentProcessedData'. En attenteCheckpointStats.java (138)
  • V6074 Modification non atomique de la variable volatile. Inspectez «currentPersistedData». En attenteCheckpointStats.java (143)


L'analyseur a suggéré que jusqu'à 4 champs volatils dans la méthode changent de manière non atomique. Et l'analyseur, comme toujours, s'avère avoir raison, car les opérations ++ et + = sont , en fait, une séquence de plusieurs opérations de lecture-modification-écriture. Comme vous le savez, la valeur volatile d'un champ est visible par tous les threads, ce qui signifie qu'une partie des modifications de champ peut être perdue en raison d'une condition de concurrence. Vous pouvez lire des informations plus détaillées à ce sujet dans la description des diagnostics.



Conclusion



Dans les projets Big Data, la fiabilité est l'une des exigences clés, par conséquent, la qualité du code qu'ils contiennent doit être étroitement surveillée. Les développeurs d'Apache Flink ont ​​été aidés dans ce domaine par plusieurs outils, et ils ont également écrit un nombre important de tests. Cependant, même dans de telles conditions, l'analyseur PVS-Studio a pu trouver des erreurs. Il est impossible de se débarrasser complètement des erreurs, mais l'utilisation régulière de divers outils d'analyse de code statique vous permettra de vous rapprocher de cet idéal. Oui, exactement régulièrement. Ce n'est qu'avec une utilisation régulière que l'analyse statique montre son efficacité, qui est décrite plus en détail dans cet article .





Si vous souhaitez partager cet article avec un public anglophone, veuillez utiliser le lien de traduction: Valery Komarov. Big / Bug Data: analyse du code source Apache Flink .



All Articles