Un taxi sans pilote conduit des canards en caoutchouc jaunes dans la ville! Module de vérification des problèmes pour la plate-forme Gym-Duckietown

D'ici 2040, la plupart des grandes villes du monde conduiront des voitures sans chauffeur, selon les analystes . Mais pour se détendre sur la route dans 20 ans, nous devons faire du bon travail sur les algorithmes de conduite autonome. Pour ce faire, le MIT a développé la plate- forme Duckietown , qui vous permet de le faire à un coût minimal. À Duckietown, des robots mobiles à bas prix transportent des canards en caoutchouc jaune dans un modèle réduit de la ville. Sur la base de cette plate-forme, des Jeux olympiques de conduite IA sont organisés et des cours sont lancés dans les universités sur l'application des technologies d'intelligence artificielle à la gestion des véhicules sans pilote.



Dans cet article, je parlerai de mon projet de cours, sur lequel j'ai travaillé avec le Laboratoire d'algorithmes de robots mobiles JetBrains Research : À propos du vérificateur de problèmes que j'ai écrit pour l' émulateur Gym-Duckietown . Nous parlerons du système de test et de l'intégration de ce système avec des plates-formes éducatives en ligne qui utilisent la technologie External Grader - par exemple, avec la plate- forme Stepik.org .









A propos de l'auteur



Je m'appelle Daniil Plushenko et je suis étudiant en première (deuxième) année du programme de maîtrise « Programmation et analyse de données » au HSE de Saint-Pétersbourg. En 2019, j'ai terminé mon baccalauréat en mathématiques appliquées et en informatique dans la même université.



Plateforme Duckietown



Duckietown est un projet de recherche sur les véhicules autonomes . Les organisateurs du projet ont créé une plateforme qui permet d'introduire une nouvelle approche de l'enseignement dans le domaine de l'intelligence artificielle et de la robotique. Tout a commencé comme un cours au MIT en 2016, mais s'est progressivement répandu dans le monde entier et à différents niveaux d'enseignement: du lycée aux programmes de maîtrise.



La plate-forme Duckietown comprend deux parties. Premièrement, il s'agit d'un modèle réduit d'un environnement de transport urbain avec des routes, des bâtiments, des panneaux de signalisation et des obstacles. Deuxièmement, c'est le transport. De petits robots mobiles (Duckiebots) exécutant un Raspberry Pi reçoivent des informations sur le monde qui les entoure via une caméra et transportent les habitants de la ville - des canards en caoutchouc jaunes - le long des routes.







J'ai travaillé avec l'émulateur Duckietown. On l'appelleGym-Duckietown , et c'est un projet open-source écrit en Python. L'émulateur place votre bot à l'intérieur de la ville, change sa position en fonction de l'algorithme que vous utilisez (ou du bouton sur lequel vous avez appuyé), redessine l'image et écrit la position actuelle du bot dans les journaux.  



Si vous souhaitez essayer, je vous recommande de cloner vous-même le référentiel et d'exécuter manual_control.py : de cette façon, le bot peut être contrôlé à l'aide des flèches du clavier.



Capture d'écran de l'émulateur



L'émulateur peut être utilisé comme environnement pour l'exécution de tâches. Posons le problème suivant: sur une carte donnée, qui se compose d'une voie, vous devez conduire un mètre en ligne droite.







Pour résoudre le problème, vous pouvez utiliser l'algorithme suivant:



for _ in range(25):
    env.step([1, 0])
    env.render()


Cette variable envstocke l'état de l'environnement.

La méthode step prend en entrée action: une liste de deux éléments décrivant l'action du bot. Le premier élément définit la vitesse, le second - l'angle de rotation. La méthode renderredessine l'image en tenant compte de la nouvelle position du bot. Le nombre de pas et la vitesse ont été choisis empiriquement: ce sont les valeurs nécessaires au bot pour parcourir exactement un mètre en ligne droite.



Si vous souhaitez utiliser cet extrait de code dans manual_control.py, collez-le ici . Le code jusqu'à ce point est occupé à charger l'environnement. Pour plus de simplicité, vous pouvez le réutiliser, puis ajouter la solution ci-dessus au problème.



Système de test



Je voudrais pouvoir vérifier automatiquement ces tâches: prendre la mise en œuvre de l'algorithme de contrôle du bot, simuler le voyage et signaler si l'algorithme proposé exécute correctement la tâche. Un tel système de test permettrait d'utiliser l'environnement lors de la préparation des concours en contrôle autonome des transports, ainsi qu'à des fins pédagogiques: pour émettre un ensemble de problèmes aux étudiants et vérifier automatiquement leurs solutions. J'ai été engagé dans son développement tout en travaillant sur un projet de cours et ci-dessous je vais vous parler de ce que j'ai obtenu.



La séquence d'étapes lors de la vérification de la solution ressemble à ceci:







Vous pouvez ajouter vous-même des tâches au système de test ou vous référer à celles que j'ai effectuées. Chaque tâche a un générateur de conditions - une classe qui génère un état d'environnement. Il contient le nom de la carte et la position de départ du bot à l'intérieur de la cellule de départ. Les coordonnées de la cible sont également définies: dans ce cas, il s'agit d'un point à un mètre de la position de départ.



class Ride1MTaskGenerator(TaskGenerator):
    def __init__(self, args):
        super().__init__(args)

    def generate_task(self):
        env_loader = CVTaskEnv if self.args.is_cv_task else TrackingDuckietownEnv
        env = env_loader(
            map_name="straight_road",
            position_on_initial_road_tile=PositionOnInitialRoadTile(
                x_coefficient=0.5,
                z_coefficient=0.5,
                angle=0,
            ),
        )
        self.generated_task['target_coordinates'] = [[env.road_tile_size * 0.5 + 1, 0, env.road_tile_size * 0.5]]
        self.generated_task['env'] = env
        env.render()
        return self.generated_task


Ici TrackingDuckietownEnvet CVTaskEnvsont des classes wrapper qui sont utilisés pour des informations de voyage en magasin pour une analyse ultérieure. 



class TrackingDuckietownEnv:
    def __init__(self, **kwargs):
        self.__wrapped = DuckietownEnv(**kwargs)
    def step(self, action):
        obs, reward, done, misc = self.__wrapped.step(action)
        message = misc['Simulator']['msg']
        if 'invalid pose' in message.lower():
            raise InvalidPoseException(message)
        for t in self.trackers:
            t.track(self)
        return obs, reward, done, misc


Les trackers collectent des informations sur l'état actuel, telles que la position du bot.



CVTaskEnvil est utilisé si une solution est requise en utilisant uniquement les informations de la caméra ("vision par ordinateur"), et non les fonctions de l'émulateur: par exemple, si vous avez besoin de savoir à quelle distance le bot est du centre de la bande ou où se trouve l'objet visible le plus proche. CVTaskEnvL'appel de fonctions d' émulateur peut simplifier la tâche et la classe restreint l'appel des méthodes d'émulateur. Il est utilisé lorsqu'un drapeau est affiché is_cv_task



class CVTaskEnv:
    def __init__(self, **kwargs):
        self.__wrapped = TrackingDuckietownEnv(**kwargs)

    def __getattr__(self, item):
        if item in self.__wrapped.overriden_methods:
            return self.__wrapped.__getattribute__(item)
        ALLOWED_FOR_CV_TASKS = [
            'render', '_CVTaskEnv__wrapped', '_TrackingDuckietownEnv__wrapped',
            'road_tile_size', 'trip_statistics'
        ]
        if item in ALLOWED_FOR_CV_TASKS:
            return self.__wrapped.__getattr__(item)
        else:
            raise AttributeError(item + " call is not allowed in CV tasks")


Une fois l'exécution de la décision terminée - à condition qu'elle n'ait pas été interrompue par un délai d'attente - les informations de déclenchement sont transmises à travers une séquence de vérificateurs. Si tous les vérificateurs ont fonctionné avec succès, le problème est considéré comme résolu correctement. Sinon, un verdict explicatif est affiché - par exemple, un bot s'est écrasé, a quitté la route, etc.



Voici l'un des vérificateurs standard. Il vérifie que le bot est revenu au point de départ à la fin du voyage. Cela peut être utile si, par exemple, dans une tâche, vous devez conduire jusqu'à un certain point puis revenir en arrière.



class SameInitialAndFinalCoordinatesChecker(Checker):
    def __init__(self, maximum_deviation=0.1, **kwargs):
        super().__init__(**kwargs)
        self.maximum_deviation = maximum_deviation

    def check(self, generated_task, trackers, **kwargs):
        trip_statistics = next(x for x in trackers if isinstance(x, TripStatistics))
        trip_data = trip_statistics.trip_data
        if len(trip_data) == 0:
            return True
        initial_coordinates = trip_data[0].position.coordinates
        final_coordinates = trip_data[-1].position.coordinates
        return np.linalg.norm(initial_coordinates - final_coordinates) < self.maximum_deviation


Un tel système de test peut être utilisé en mode manuel, c'est-à-dire démarrer manuellement un test, puis étudier visuellement le verdict. Si nous voulions, par exemple, lancer un cours en ligne sur le transport autonome sur Stepik.org , nous aurions besoin d'une intégration avec la plateforme. Cela sera discuté dans la prochaine partie de l'article.



Intégration avec les plateformes en ligne



Pour les tâches de test, la technologie External Grader est souvent utilisée, qui a été développée par la plate-forme edX .



Lors de l'utilisation de External Grader, la plate-forme éducative ne vérifie pas les tâches elle-même, mais génère une file d'attente de paquets qui sont envoyés à un autre appareil. La fonctionnalité de connexion de file d'attente est implémentée dans le projet xqueue-watcher . Le Xqueue-watcher récupère les parcelles, puis elles sont testées par le validateur (qui fait généralement plus d'actions non triviales que de comparaisons texte / nombre). Après cela, le verdict de vérification est renvoyé du côté de la plateforme éducative.



Considérons plus en détail le moment de la connexion à la file d'attente. Une fois que la plate-forme éducative a fourni les données de connexion, elles devront être ajoutées àles fichiers de configuration et, dans la méthode de notation , implémentez directement le lancement de la vérification. Des instructions plus détaillées peuvent être trouvées ici et ici .



Le Xqueue-watcher appelle le point final get_submission , qui récupérera le package de la file d'attente si possible. Après cela, elle va faire des tests. Le xqueue-watcher appelle ensuite put_result pour renvoyer le verdict.



Vous pouvez démarrer xqueue-watcher comme ceci:



make requirements && python -m xqueue_watcher -d conf.d/


Disons que nous voulons utiliser la technologie External Grader, mais que nous ne voulons pas exécuter le cours sur une plate-forme en ligne. Xqueue-watcher est implémenté en supposant qu'il existe un stockage de fichiers où les fichiers avec des solutions sont téléchargés (les plates-formes ont un tel stockage). Nous pouvons modifier Xqueue afin que le stockage de fichiers ne soit plus nécessaire, et de tels systèmes peuvent être exécutés, en général, même sur notre ordinateur portable.



Vous devez d'abord apprendre à gérer la file d'attente de colis elle-même. La fonctionnalité de file d'attente est fournie par le projet xqueue .





Image tirée de la documentation .



Vous pouvez l'exécuter comme ceci:



apt-get install libaio1 libaio-dev
apt-get install libmysqlclient-dev
pip3 install -r requirements.txt
python3 manage.py migrate
python3 manage.py runserver $xqueue_address


Vous devrez peut-être créer un fichier ~ / edx / edx.log



Par défaut, xqueue donne à xqueue-watcher non pas le contenu des packages, c'est-à-dire des fichiers avec la solution au problème, mais des liens vers ces fichiers dans le stockage de fichiers. Pour être indépendant du stockage de fichiers, vous pouvez envoyer les fichiers eux-mêmes et les stocker sur la même machine que celle sur laquelle xqueue-watcher s'exécute. 



Voici comment le code source doit être modifié pour y parvenir:



L'implémentation de la méthode _upload dans lms_interface.py est remplacée par celle-ci:



def _upload(file_to_upload, path, name):
    '''
    Upload file using the provided keyname.
    Returns:
        URL to access uploaded file
    '''
    full_path = os.path.join(path, name)
    return default_storage.save(full_path, file_to_upload)


Si aucun stockage de fichiers n'était connecté, cette méthode enregistrera le fichier avec la solution dans le chemin $ queue_name / $ file_to_upload_hash.



Dans l'implémentation de get_sumbission dans le fichier ext_interface.py, au lieu de cette ligne, écrivez:



xqueue_files = json.loads(submission.urls)
for xqueue_file in xqueue_files.keys():
    with open(xqueue_files[xqueue_file], 'r') as f:
        xqueue_files[xqueue_file] = f.readlines()


Ne transférons pas les liens (chemins) vers les fichiers, mais leur contenu.



Chaque solution est exécutée dans un conteneur docker «unique» avec des ressources limitées, c'est-à-dire qu'un conteneur distinct est créé pour l'exécution de chaque solution, qui est supprimé une fois le test terminé. Pour créer de tels conteneurs et y exécuter des commandes, le portainer-api est utilisé (en fait, comme wrapper sur l'API Docker).



Résultat



Dans cet article, j'ai parlé de la création du système de test et des tâches de transport autonome, ainsi que de l'intégration de ce système avec des plateformes éducatives en ligne utilisant la technologie External Grader. J'espère qu'un cours utilisant ce système sera bientôt lancé, et la partie sur l'intégration avec les plateformes en ligne sera utile pour ceux qui souhaitent créer leur propre cours hors ligne ou en ligne.



All Articles