物理の駅 Physics station by 現役研究者

テクノロジーは共有されてこそ栄える

Python: OpenCVでMJPGを表示、tkinter でフレームレートを表示させる、録画する

素直にMJPG形式で配信される動画をOpenCVで表示させる

import cv2
 
url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url) 
 
while(True):
    try:
        ret, frame = cap.read()
        if ret:
            cv2.imshow("Video", frame)
            cv2.waitKey(1)
        else:raise
 
    except:
        cap.release()
        cv2.destroyAllWindows()
        break

次にフレームレートを表示するため、tkinterの無限ループを実装する。

移動平均でフレームレートを平滑化していること、signal で KeyboardInterrupt をキャッチさせている。

import tkinter as tk
import time
import numpy as np
import signal

current_time = 0
window_length = 200
nframe = 0
times = [1 for _ in range(window_length)]

root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)

def handler(event):
    root.destroy()
    
def update_gui():
    global current_time, nframe
    times[nframe%window_length] = time.time() - current_time
    current_time = time.time()
    nframe+=1
    try:label.config(text=f"{1/np.mean(times):.0f} Hz")
    except:return
    
    root.after(1, update_gui)

    
signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
update_gui()
root.mainloop()

組み合わせてみよう。

import cv2
import tkinter as tk
import numpy as np
import time
import signal

current_time = 0
window_length = 20
nframe = 0
times = [1 for _ in range(window_length)]

root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)

def handler(event):
    root.destroy()
    cv2.destroyAllWindows()

def close_event():
    handler(None)

def show_frame():
    global current_time, nframe
    ret, frame = cap.read()
    if ret:
        cv2.imshow("Video", frame)
        cv2.waitKey(1)

        times[nframe%window_length] = time.time() - current_time
        current_time = time.time()
        nframe+=1
        try:label.config(text=f"{1/np.mean(times):.1f} Hz")
        except:
            cap.release()
            cv2.destroyAllWindows()
            return
        
    root.after(1, show_frame)


url = "http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url)

signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
show_frame()
root.protocol("WM_DELETE_WINDOW", close_event)
root.mainloop()

PNGファイルで録画する。なお、1枚ずつ出力するとファイル数が膨大になるため、Nフレーム毎にSPNG形式で出力するコードとしている。フレームを落とさないように、出力部分はThreadPoolExecutorで非同期実行にしている。

import cv2
import time
import datetime
import copy
from concurrent.futures import ThreadPoolExecutor

url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url) 

def write_spng(filename, images):
    import struct
    f = open(filename, "wb")
    for frame in images:
        ret, encoded = cv2.imencode(".png",frame)
        f.write(struct.pack("Q", len(encoded)))
        encoded.tofile(f)
    f.close()

N = 300
images = []
current_time = time.time()
future = None

with ThreadPoolExecutor() as executor:
    while(True):
        ret, frame = cap.read()
        if ret:
            images.append(frame)
            if len(images) == N:
                filename = f"{datetime.datetime.now().strftime('%Y-%m%d_%H%M%S')}_{N/(time.time()-current_time):.3f}Hz.spng"
                current_time = time.time()
                print(future if future is None else future.result(), filename)
                future = executor.submit(write_spng, filename, copy.deepcopy(images))
                del images
                images = []

ダブルクリックすると表示されている画像を保存する機能もつけてみよう。本当にそのフレームが保存できているのか確認するためにフレーム番号をテキストで書き込み、保存するときのデフォルト名はクリック時点での日時にした。

import cv2
import tkinter as tk
from tkinter import filedialog
from datetime import datetime

root = tk.Tk()
root.withdraw()

url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url) 

frame = None

def on_double_click(event, x, y, flags, param):
    if event == cv2.EVENT_LBUTTONDBLCLK:
        save_filename = filedialog.asksaveasfilename(
            defaultextension=".png", 
            filetypes=[("PNG files", "*.png")],
            initialfile=datetime.now().strftime("%Y-%m%d_%H-%M-%S.png"))
        if save_filename:
            cv2.imwrite(save_filename, frame)
            print("Clicked area saved as", save_filename)

first = True

def put_text(frame, i):
    position = (10, 30)
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1
    font_color = (255, 255, 255)
    thickness = 2
    cv2.putText(frame, f"No.{i:04d}", position, font, font_scale, font_color, thickness)

i = 0
while(True):
    try:
        ret, frame = cap.read()
        if ret:
            put_text(frame, i) #保存したフレームを確認するためフレーム番号を画像に書き込む
            cv2.imshow("Video", frame)
            i += 1
            if first:
                cv2.setMouseCallback("Video", on_double_click)
                first = False
            cv2.waitKey(1)
        else:raise
 
    except:
        cap.release()
        cv2.destroyAllWindows()
        break