La journĂ©e de tĂ©lĂ©travail de vendredi touchait dĂ©jĂ Ă sa fin lorsqu'on a frappĂ© Ă la porte pour annoncer l'installation d'un nouvel interphone. En apprenant que le nouvel interphone dispose d'une application mobile qui permet de rĂ©pondre aux appels sans ĂȘtre Ă la maison, je me suis intĂ©ressĂ© et l'ai immĂ©diatement tĂ©lĂ©chargĂ© sur mon tĂ©lĂ©phone. AprĂšs m'ĂȘtre connectĂ©, j'ai dĂ©couvert une fonctionnalitĂ© intĂ©ressante de cette application - mĂȘme sans appel actif Ă mon appartement, je pouvais regarder dans la camĂ©ra de l'interphone et ouvrir la porte Ă tout moment. "Oui, c'est ARI en ligne Ă la porte d'entrĂ©e !" - cliquĂ© dans ma tĂȘte. Le sort du week-end Ă venir Ă©tait scellĂ©.
Démonstration vidéo en fin d'article.
Avertissement
. , - â .
API
, , . - â , . lkit â , http(s) Android .
â Android- Certificate authority , . , Android 7 .
root , Android, Android Studio. ADB , Certificate pinning .
, â , .
:
: POST
/rest/v1/places/{place_id}/accesscontrols/{control_id}/actions
JSON-{"name": "accessControlOpen"}
() : GET
/rest/v1/places/{place_id}/accesscontrols/{control_id}/videosnapshots
: GET
/rest/v1/forpost/cameras/{camera_id}/video?LightStream=0
HTTP Authorization â , . Advanced REST Client, , Authorization API , , .
Python requests
, :
HEADERS = {"Authorization": "Bearer ###"}
ACTION_URL = "https://###.ru/rest/v1/places/###/accesscontrols/###/"
VIDEO_URL = "https://###.ru/rest/v1/forpost/cameras/###/video?LightStream=0"
def get_image():
result = requests.get(f'{ACTION_URL}/videosnapshots', headers=HEADERS)
if result.status_code != 200:
logging.error(f"Failed to get an image with status code {result.status_code}")
return None
logging.warning(f"Image received successfully in {result.elapsed.total_seconds()}sec")
return result.content
def open_door():
result = requests.post(
f'{ACTION_URL}/actions', headers=HEADERS, json={"name": "accessControlOpen"})
if result.status_code != 200:
logging.error(f"Failed to open the door with status code {result.status_code}")
return False
logging.warning(f"Door opened successfully in {result.elapsed.total_seconds()}sec")
return True
def get_videostream_link():
result = requests.get(VIDEO_URL, headers=HEADERS)
if result.status_code != 200:
logging.error(f"Failed to get stream link with status code {result.status_code}")
return False
logging.warning(f"Stream link received successfully in {result.elapsed.total_seconds()}sec")
return result.json()['data']['URL']
, â Intel(R) Xeon(R) CPU E5-2650L v3 @ 1.80GHz
, 1GB 0 GPU. , , .
, . OpenVINO Toolkit â Intel, CPU.
Interactive Face Recognition Demo â , . , - 2020.3, pip 2021.1. OpenVINO .
, . ( ), , , :
class ImageProcessor:
def __init__(self):
self.frame_processor = FrameProcessor()
def process(self, image):
detections = self.frame_processor.process(image)
labels = []
for roi, landmarks, identity in zip(*detections):
label = self.frame_processor.face_identifier.get_identity_label(
identity.id)
labels.append(label)
return labels
. , get_image()
.
100 runs on an image with known face:
Total time: 7.356s
Time per frame: 0.007s
FPS: 135.944
100 runs on an image without faces:
Total time: 2.985s
Time per frame: 0.003s
FPS: 334.962
, .
1 FPS:
, , . , MVP get_image()
.
class ImageProcessor:
# <...>
def process_single_image(self, image):
nparr = np.fromstring(image, np.uint8)
img_np = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
labels = self.process(img_np)
return labels
def snapshot_based_intercom_id():
processor = ImageProcessor()
last_open_door_time = time.time()
while True:
start_time = time.time()
image = get_image()
result = processor.process_single_image(image)
logging.info(f'{result} in {time.time() - start_time}s')
# Successfull detections are "face{N}"
if any(['face' in res for res in result]):
if start_time - last_open_door_time > 5:
open_door()
with open(f'images/{start_time}_OK.jfif', 'wb') as f:
f.write(image)
last_open_door_time = start_time
, , . , .. .
! , . , â , .. API . , 0.7 0.6 , .
30 FPS:
:
vcap = cv2.VideoCapture(link) success, frame = vcap.read()
, 30 FPS. : read()
. , , , . , , 30 â , .
: vcap.set(CV_CAP_PROP_BUFFERSIZE, 0);
. , OpenCV 3.4, - , . , StackOverflow â , ( , ).
ImageProcessor
3 :
class CameraBufferCleanerThread(threading.Thread):
def __init__(self, camera, name='camera-buffer-cleaner-thread'):
self.camera = camera
self.last_frame = None
self.finished = False
super(CameraBufferCleanerThread, self).__init__(name=name)
self.start()
def run(self):
while not self.finished:
ret, self.last_frame = self.camera.read()
def __enter__(self): return self
def __exit__(self, type, value, traceback):
self.finished = True
self.join()
class ImageProcessor:
# <...>
def process_stream(self, link):
vcap = cv2.VideoCapture(link)
interval = 0.3 # ~3 FPS
with CameraBufferCleanerThread(vcap) as cam_cleaner:
while True:
frame = cam_cleaner.last_frame
if frame is not None:
yield (self.process(frame), frame)
else:
yield (None, None)
time.sleep(interval)
snapshot_based_intercom_id
:
def stream_based_intercom_id():
processor = ImageProcessor()
link = get_videostream_link()
# To notify about delays
last_time = time.time()
last_open_door_time = time.time()
for result, np_image in processor.process_stream(link):
current_time = time.time()
delta_time = current_time - last_time
if delta_time < 1:
logging.info(f'{result} in {delta_time}')
else:
logging.warning(f'{result} in {delta_time}')
last_time = current_time
if result is None:
continue
if any(['face' in res for res in result]):
if current_time - last_open_door_time > 5:
logging.warning(
f'Hey, I know you - {result[0]}! Opening the door...')
last_open_door_time = current_time
open_door()
cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
â , .
Telegram
/. .
python-telegram-bot
, callback / .
class TelegramInterface:
def __init__(self, login_whitelist, state_callback):
self.state_callback = state_callback
self.login_whitelist = login_whitelist
self.updater = Updater(
token = "###", use_context = True)
self.run()
def run(self):
dispatcher = self.updater.dispatcher
dispatcher.add_handler(CommandHandler("start", self.start))
dispatcher.add_handler(CommandHandler("run", self.run_intercom))
dispatcher.add_handler(CommandHandler("stop", self.stop_intercom))
self.updater.start_polling()
def run_intercom(self, update: Update, context: CallbackContext):
user = update.message.from_user
update.message.reply_text(
self.state_callback(True) if user.username in self.login_whitelist else 'not allowed',
reply_to_message_id=update.message.message_id)
def stop_intercom(self, update: Update, context: CallbackContext):
user = update.message.from_user
update.message.reply_text(
self.state_callback(False) if user.username in self.login_whitelist else 'not allowed',
reply_to_message_id=update.message.message_id)
def start(self, update: Update, context: CallbackContext) -> None:
update.message.reply_text('Hi!')
class TelegramBotThreadWrapper(threading.Thread):
def __init__(self, state_callback, name='telegram-bot-wrapper'):
self.whitelist = ["###", "###"]
self.state_callback = state_callback
super(TelegramBotThreadWrapper, self).__init__(name=name)
self.start()
def run(self):
self.bot = TelegramInterface(self.whitelist, self.state_callback)
intercom_id
, :
def stream_based_intercom_id_with_telegram():
processor = ImageProcessor()
loop_state_lock = threading.Lock()
loop_should_run = False
loop_should_change_state_cv = threading.Condition(loop_state_lock)
is_loop_finished = True
loop_changed_state_cv = threading.Condition(loop_state_lock)
def stream_processing_loop():
nonlocal loop_should_run
nonlocal loop_should_change_state_cv
nonlocal is_loop_finished
nonlocal loop_changed_state_cv
while True:
with loop_should_change_state_cv:
loop_should_change_state_cv.wait_for(lambda: loop_should_run)
is_loop_finished = False
loop_changed_state_cv.notify_all()
logging.warning(f'Loop is started')
link = get_videostream_link()
last_time = time.time()
last_open_door_time = time.time()
for result, np_image in processor.process_stream(link):
with loop_should_change_state_cv:
if not loop_should_run:
is_loop_finished = True
loop_changed_state_cv.notify_all()
logging.warning(f'Loop is stopped')
break
current_time = time.time()
delta_time = current_time - last_time
if delta_time < 1:
logging.info(f'{result} in {delta_time}')
else:
logging.warning(f'{result} in {delta_time}')
last_time = current_time
if result is None:
continue
if any(['face' in res for res in result]):
if current_time - last_open_door_time > 5:
logging.warning(f'Hey, I know you - {result[0]}! Opening the door...')
last_open_door_time = current_time
open_door()
cv2.imwrite(f'images/{current_time}_OK.jpg', np_image)
def state_callback(is_running):
nonlocal loop_should_run
nonlocal loop_should_change_state_cv
nonlocal is_loop_finished
nonlocal loop_changed_state_cv
with loop_should_change_state_cv:
if is_running == loop_should_run:
return "Intercom service state is not changed"
loop_should_run = is_running
if loop_should_run:
loop_should_change_state_cv.notify_all()
loop_changed_state_cv.wait_for(lambda: not is_loop_finished)
return "Intercom service is up"
else:
loop_changed_state_cv.wait_for(lambda: is_loop_finished)
return "Intercom service is down"
telegram_bot = TelegramBotThreadWrapper(state_callback)
logging.warning("Bot is ready")
stream_processing_loop()
:
Malgré les possibilités que la technologie d'interphone intelligent apporte aux résidents, des centaines (des milliers ?) De portes d'entrée avec caméras et microphones (oui, il y a du son dans le flux vidéo reçu au hasard !), ouvrant de nouvelles opportunités pour les violations de la vie privée.
Je préférerais que l'accÚs au flux vidéo ne soit fourni qu'au moment d'un appel à l'appartement et que l'enregistrement en cours de trois jours, positionné comme un moyen de divulgation des violations, soit stocké non sur les serveurs de l'entreprise, mais directement dans l'interphone , avec la possibilité d'y accéder sur demande. Ou pas du tout.