"Caméra intelligente" basée sur Raspberry Pi contrôlée via le bot Telegram

Salut, je m'appelle Ivan. Je vais immédiatement répondre à la question principale: pourquoi ai-je commencé à m'assembler et n'ai pas pris de solution toute faite? Premièrement, le coût des solutions toutes faites - le Raspberry Pi avec tous les capteurs et une caméra ne dépassait pas 30 $, la plupart toujours au taux de 60 roubles par dollar. Deuxièmement, presque toutes les pièces étaient déjà là - mon frère a donné le Raspberry Pi, la caméra est restée de l'époque hirsute, il y avait aussi une diode - je l'ai achetée pour Arduino, et le capteur de mouvement sur Aliexpress ne coûtait pas plus de 100 roubles.





L'histoire de l'article sera structurée comme suit:





  1. Définissons ce dont nous avons besoin;





  2. Travaillons avec la diode;





  3. Avec détecteur de mouvement;





  4. Avec un appareil photo (photo);





  5. Avec une caméra (vidéo);





  6. Analysons le travail avec le bot Telegram (considérons le squelette);





  7. Créons une "caméra intelligente";





  8. Voyons comment fonctionne la caméra intelligente;





  9. Identifiez les goulots d'étranglement et les solutions possibles.





Alors allons-y





De quoi avons nous besoin

Internet doit être configuré sur le Raspberry, installé:





  • ffmpeg - pour enregistrer une vidéo à partir d'une caméra;





  • Python 3.7.





Python doit avoir les bibliothèques suivantes:





  • RPi.GPIO;





  • pygame;





  • telebot.





Et vous avez besoin de fils - F2F pour connecter tout cela.





Travailler avec une diode

import RPi.GPIO as GPIO
import time

LED_PIN = 3

def setup():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setwarnings(False)
    GPIO.setup(LED_PIN, GPIO.OUT)

def destroy():
    GPIO.output(LED_PIN, GPIO.LOW)
    GPIO.cleanup()

def blink():
    GPIO.output(LED_PIN, GPIO.HIGH)
    time.sleep(1)
    GPIO.output(LED_PIN, GPIO.LOW)

def main():
    setup()
    blink()
    destroy()

if __name__ == '__main__':
    main()
      
      



Raspberry, . , Raspberry, , , . . - blink , 1 . main . .





Raspberry , , - :





- delay sensitivity , :





, , . "", , :





, - (16 ).





import RPi.GPIO as GPIO
import time

PIR_PIN = 11

def setup():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setwarnings(False)
    GPIO.setup(PIR_PIN, GPIO.IN)

def destroy():
    GPIO.cleanup()

def sensorJob():
    while True:
        i = GPIO.input(PIR_PIN)
        
        print("PIR value: " + str(i))
        
        time.sleep(0.1)

def main():
    setup()
    sensorJob()
    destroy()

if __name__ == '__main__':
    main()
      
      



. , . - sensorJob , .





()

"" . USB-, , . , . 1 /dev/video0. , .





from datetime import datetime
import pygame
import pygame.camera

pygame.init()
pygame.camera.init()
pygame.camera.list_cameras() 

cam = pygame.camera.Camera("/dev/video0", (640,426))

def saveCapture():
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
    
    cam.start()
    pygame.image.save(cam.get_image(), filename)
    cam.stop()

def main():
    setup()
    saveCapture()
    destroy()

if __name__ == '__main__':
    main()
      
      



. , , , , . saveCapture , start , get_image() , pygame.image.save, stop. pygame, ffmpeg , .





()

:





ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy <filename>
      
      



- v4l2, , , -c copy, , , preview Telegram , . , Telegram, .





Telegram-

Telegram- , BotFather Telegram, , . , InlineKeyboard Telegram-, , . , ID , ( ) .





from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot

TOKEN = '99999999999:xxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999

bot = telebot.TeleBot(TOKEN)

@bot.message_handler(commands=['start'])
def start(message):    
    telegram_user = message.from_user
    
    if telegram_user.id != ADMIN_USER_ID:
        bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
        
        return
           
    keyboard = [
        [
            InlineKeyboardButton("Send capture", callback_data='sendCapture'),
            InlineKeyboardButton("Send video", callback_data='sendVideo')
        ],
    ]

    bot.send_message(chat_id=message.chat.id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def sendCapture(chat_id):
    bot.send_photo(chat_id, photo=open('<filename>', 'rb'))

def sendVideo(chat_id):
    bot.send_video(chat_id, open('<filename>', 'rb'))

@bot.callback_query_handler(func=lambda call: True)
def button(call):
    globals()[call.data](call.message.chat.id)

def main():
    setup()
    bot.polling(none_stop=False, interval=5, timeout=20)
    destroy()

if __name__ == '__main__':
    main()
      
      



InlineKeyboard, Telegram-. TOKEN BotFather. ADMIN ID. start. start USER ID , , , InlineKeyboard . . sendCapture sendVideo Telegram-. , InlineKeyboard, , , . main bot.pooling, . Raspberry , -. - interval = 5. .





" "

GPIO , USB.





:





, " " Telegram-.





from datetime import datetime
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
import telebot
import logging
import pygame
import pygame.camera
import RPi.GPIO as GPIO
import threading
import time
import os

TOKEN = '99999999999:xxxxxxxxxxxxxxxxxxxxxx'
ADMIN_USER_ID = 999999999999
LED_PIN = 3
PIR_PIN = 11
VIDEO_FILE_FORMAT = '.mkv'

# Enable Logging
logging.basicConfig(
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        level=logging.INFO)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

isSensorEnabled = False
isMuteNotifications = False

last_chat_id = -1
keyboard = []

pygame.init()
pygame.camera.init()
pygame.camera.list_cameras() 

cam = pygame.camera.Camera("/dev/video0", (640,426))
bot = telebot.TeleBot(TOKEN)

def setup():
    GPIO.setmode(GPIO.BOARD) 
    GPIO.setwarnings(False)
    GPIO.setup(LED_PIN, GPIO.OUT)
    GPIO.setup(PIR_PIN, GPIO.IN)

def destroy():
    GPIO.output(LED_PIN, GPIO.LOW)
    GPIO.cleanup() 

def log_params(method_name, message):
    logger.debug("Method: %s\nFrom: %s\nchat_id: %d\nText: %s" %
                (method_name,
                 message.from_user,
                 message.chat.id,
                 message.text))

@bot.message_handler(commands=['start'])
def start(message):
    global keyboard
    
    log_params('start', message)
    
    telegram_user = message.from_user
    
    if telegram_user.id != ADMIN_USER_ID:
        bot.send_message(message.chat.id, text="Hello. My name is James Brown. What do i do for you?")
        
        return
           
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=message.chat.id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def sendCapture(chat_id):
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + '.jpg'
    
    if (os.path.exists(filename)):
        bot.send_photo(chat_id, photo=open(filename, 'rb'))
    else:
        cam.start()
        pygame.image.save(cam.get_image(), filename)
        cam.stop()

        bot.send_photo(chat_id, photo=open(filename, 'rb'))

def get_capture(chat_id):
    sendCapture(chat_id)
    bot.send_message(chat_id=chat_id,
                     text="Supported commands:",
                     reply_markup=InlineKeyboardMarkup(keyboard))

def sendVideo(chat_id):
    filename = sorted(list(filter(lambda x: x.endswith(VIDEO_FILE_FORMAT), os.listdir())))[-1]

    bot.send_video(chat_id, open(filename, 'rb'))

def captureVideo():
    filename = datetime.now().strftime('%d-%m-%Y %H:%M:%S') + VIDEO_FILE_FORMAT
    
    os.system("ffmpeg -f v4l2 -framerate 25 -video_size 640x426 -i /dev/video0 -t 5 -c copy \"" + filename + "\"")
    
    return filename

def get_video(chat_id):
    bot.send_message(chat_id=chat_id,
                     text="Capturing video..")
        
    filename = captureVideo()

    bot.send_message(chat_id=chat_id,
                     text="Sending video..")
    bot.send_video(chat_id, open(filename, 'rb'))
    bot.send_message(chat_id=chat_id,
                     text="Supported commands:",
                     reply_markup=InlineKeyboardMarkup(keyboard))

def sensorJob():
    global isSensorEnabled
    global keyboard
    
    isRecording = False
    
    while isSensorEnabled:
        i = GPIO.input(PIR_PIN)
        
        GPIO.output(LED_PIN, i)
        
        if (i == 1 and not isRecording):
            isRecording = True
            
            if (not isMuteNotifications):
                sendCapture(last_chat_id)
        
        if (isRecording):
            captureVideo()
        
        if (i == 0 and isRecording):
            if (not isMuteNotifications):
                sendVideo(last_chat_id)
            
            isRecording = False
        
        time.sleep(0.1)
        
    if (isRecording):
        sendVideo(last_chat_id)
    
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=last_chat_id,
                     text="Sensor stopped")
    bot.send_message(chat_id=last_chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def start_sensor(chat_id):
    global keyboard
    global isSensorEnabled
    global last_chat_id
    
    last_chat_id = chat_id
    isSensorEnabled = True
    
    threading.Thread(target=sensorJob).start()
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Sensor started")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))
    
def stop_sensor(chat_id):
    global keyboard
    global last_chat_id
    
    last_chat_id = -1
    isSensorEnabled = False
    
    GPIO.output(LED_PIN, GPIO.LOW)
    
    keyboard = [
        [InlineKeyboardButton("Start sensor", callback_data='start_sensor')],
        [
            InlineKeyboardButton("Get capture", callback_data='get_capture'),
            InlineKeyboardButton("Get video", callback_data='get_video')
        ],
    ]

    bot.send_message(chat_id=chat_id,
                         text="Sensor stop requested")

def mute_notifications(chat_id):
    global keyboard
    
    isMuteNotifications = True
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Unmute notifications", callback_data='unmute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Notifications muted")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

def unmute_notifications(chat_id):
    global keyboard
    
    isMuteNotifications = False
    
    keyboard = [
        [
            InlineKeyboardButton("Stop sensor", callback_data='stop_sensor'),
            InlineKeyboardButton("Mute notifications", callback_data='mute_notifications')
        ]
    ]

    bot.send_message(chat_id=chat_id,
                         text="Notifications unmuted")
    bot.send_message(chat_id=chat_id,
                         text="Supported commands:",
                         reply_markup=InlineKeyboardMarkup(keyboard))

@bot.callback_query_handler(func=lambda call: True)
def button(call):
    globals()[call.data](call.message.chat.id)

def main():
    setup()
    bot.polling(none_stop=False, interval=5, timeout=20)
    destroy()

if __name__ == '__main__':
    main()
      
      



, . . logging " ", . . chat_id, - . , . pygame Telegram-. setup destroy , . start user_id, - , . sendCapture filename, chat_id. get_capture . sendVideo . captureVideo filename ffmpeg. get_video .





sensorJob: , . - . sensorJob - last_chat_id. start_sensor hat_id, , sensorJob . stop_sensor last_chat_id, , . mute/unmute_notifications . " " . " " .





" "

- , 5 . bot.pooling main, interval. -, , , , - "Connection timeout" Telegram- - .





  1. , ;





  2. Python , , pyffmpeg, ;





  3. , ;





  4. Disponibilité d'Internet rapide pour une réception plus rapide de vidéos à jour.








All Articles