素直にMJPG形式で配信される動画をOpenCVで表示させる
import cv2
url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url)
while(True):
try:
ret, frame = cap.read()
if ret:
cv2.imshow("Video", frame)
cv2.waitKey(1)
else:raise
except:
cap.release()
cv2.destroyAllWindows()
break
次にフレームレートを表示するため、tkinterの無限ループを実装する。
移動平均でフレームレートを平滑化していること、signal
で KeyboardInterrupt をキャッチさせている。
import tkinter as tk
import time
import numpy as np
import signal
current_time = 0
window_length = 200
nframe = 0
times = [1 for _ in range(window_length)]
root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)
def handler(event):
root.destroy()
def update_gui():
global current_time, nframe
times[nframe%window_length] = time.time() - current_time
current_time = time.time()
nframe+=1
try:label.config(text=f"{1/np.mean(times):.0f} Hz")
except:return
root.after(1, update_gui)
signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
update_gui()
root.mainloop()
組み合わせてみよう。
import cv2
import tkinter as tk
import numpy as np
import time
import signal
current_time = 0
window_length = 20
nframe = 0
times = [1 for _ in range(window_length)]
root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)
def handler(event):
root.destroy()
cv2.destroyAllWindows()
def close_event():
handler(None)
def show_frame():
global current_time, nframe
ret, frame = cap.read()
if ret:
cv2.imshow("Video", frame)
cv2.waitKey(1)
times[nframe%window_length] = time.time() - current_time
current_time = time.time()
nframe+=1
try:label.config(text=f"{1/np.mean(times):.1f} Hz")
except:
cap.release()
cv2.destroyAllWindows()
return
root.after(1, show_frame)
url = "http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url)
signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
show_frame()
root.protocol("WM_DELETE_WINDOW", close_event)
root.mainloop()
PNGファイルで録画する。なお、1枚ずつ出力するとファイル数が膨大になるため、Nフレーム毎にSPNG形式で出力するコードとしている。フレームを落とさないように、出力部分はThreadPoolExecutor
で非同期実行にしている。
import cv2
import time
import datetime
import copy
from concurrent.futures import ThreadPoolExecutor
url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url)
def write_spng(filename, images):
import struct
f = open(filename, "wb")
for frame in images:
ret, encoded = cv2.imencode(".png",frame)
f.write(struct.pack("Q", len(encoded)))
encoded.tofile(f)
f.close()
N = 300
images = []
current_time = time.time()
future = None
with ThreadPoolExecutor() as executor:
while(True):
ret, frame = cap.read()
if ret:
images.append(frame)
if len(images) == N:
filename = f"{datetime.datetime.now().strftime('%Y-%m%d_%H%M%S')}_{N/(time.time()-current_time):.3f}Hz.spng"
current_time = time.time()
print(future if future is None else future.result(), filename)
future = executor.submit(write_spng, filename, copy.deepcopy(images))
del images
images = []