素直にMJPG形式で配信される動画をOpenCVで表示させる
import cv2 url = f"http://192.168.0.10/mjpg/video.mjpg" cap = cv2.VideoCapture(url) while(True): try: ret, frame = cap.read() if ret: cv2.imshow("Video", frame) cv2.waitKey(1) else:raise except: cap.release() cv2.destroyAllWindows() break
次にフレームレートを表示するため、tkinterの無限ループを実装する。
移動平均でフレームレートを平滑化していること、signal
で KeyboardInterrupt をキャッチさせている。
import tkinter as tk import time import numpy as np import signal current_time = 0 window_length = 200 nframe = 0 times = [1 for _ in range(window_length)] root = tk.Tk() root.title("Frame rate") label = tk.Label(root, font=('Helvetica', 48)) label.pack(padx=20, pady=20) def handler(event): root.destroy() def update_gui(): global current_time, nframe times[nframe%window_length] = time.time() - current_time current_time = time.time() nframe+=1 try:label.config(text=f"{1/np.mean(times):.0f} Hz") except:return root.after(1, update_gui) signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None)) update_gui() root.mainloop()
組み合わせてみよう。
import cv2 import tkinter as tk import numpy as np import time import signal current_time = 0 window_length = 20 nframe = 0 times = [1 for _ in range(window_length)] root = tk.Tk() root.title("Frame rate") label = tk.Label(root, font=('Helvetica', 48)) label.pack(padx=20, pady=20) def handler(event): root.destroy() cv2.destroyAllWindows() def close_event(): handler(None) def show_frame(): global current_time, nframe ret, frame = cap.read() if ret: cv2.imshow("Video", frame) cv2.waitKey(1) times[nframe%window_length] = time.time() - current_time current_time = time.time() nframe+=1 try:label.config(text=f"{1/np.mean(times):.1f} Hz") except: cap.release() cv2.destroyAllWindows() return root.after(1, show_frame) url = "http://192.168.0.10/mjpg/video.mjpg" cap = cv2.VideoCapture(url) signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None)) show_frame() root.protocol("WM_DELETE_WINDOW", close_event) root.mainloop()
PNGファイルで録画する。なお、1枚ずつ出力するとファイル数が膨大になるため、Nフレーム毎にSPNG形式で出力するコードとしている。フレームを落とさないように、出力部分はThreadPoolExecutor
で非同期実行にしている。
import cv2 import time import datetime import copy from concurrent.futures import ThreadPoolExecutor url = f"http://192.168.0.10/mjpg/video.mjpg" cap = cv2.VideoCapture(url) def write_spng(filename, images): import struct f = open(filename, "wb") for frame in images: ret, encoded = cv2.imencode(".png",frame) f.write(struct.pack("Q", len(encoded))) encoded.tofile(f) f.close() N = 300 images = [] current_time = time.time() future = None with ThreadPoolExecutor() as executor: while(True): ret, frame = cap.read() if ret: images.append(frame) if len(images) == N: filename = f"{datetime.datetime.now().strftime('%Y-%m%d_%H%M%S')}_{N/(time.time()-current_time):.3f}Hz.spng" current_time = time.time() print(future if future is None else future.result(), filename) future = executor.submit(write_spng, filename, copy.deepcopy(images)) del images images = []
ダブルクリックすると表示されている画像を保存する機能もつけてみよう。本当にそのフレームが保存できているのか確認するためにフレーム番号をテキストで書き込み、保存するときのデフォルト名はクリック時点での日時にした。
import cv2 import tkinter as tk from tkinter import filedialog from datetime import datetime root = tk.Tk() root.withdraw() url = f"http://192.168.0.10/mjpg/video.mjpg" cap = cv2.VideoCapture(url) frame = None def on_double_click(event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDBLCLK: save_filename = filedialog.asksaveasfilename( defaultextension=".png", filetypes=[("PNG files", "*.png")], initialfile=datetime.now().strftime("%Y-%m%d_%H-%M-%S.png")) if save_filename: cv2.imwrite(save_filename, frame) print("Clicked area saved as", save_filename) first = True def put_text(frame, i): position = (10, 30) font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1 font_color = (255, 255, 255) thickness = 2 cv2.putText(frame, f"No.{i:04d}", position, font, font_scale, font_color, thickness) i = 0 while(True): try: ret, frame = cap.read() if ret: put_text(frame, i) #保存したフレームを確認するためフレーム番号を画像に書き込む cv2.imshow("Video", frame) i += 1 if first: cv2.setMouseCallback("Video", on_double_click) first = False cv2.waitKey(1) else:raise except: cap.release() cv2.destroyAllWindows() break