物理の駅 Physics station by 現役研究者

テクノロジーは共有されてこそ栄える

Python: OpenCVでライン(線形)プロファイルを取得する

Windows11でおなじみの壁紙のラインプロファイル=直線上のピクセルの輝度値を取得してみよう。

参考にしたブログ ラインプロファイルの取得法【5/24 訂正】 #Python - Qiita

import cv2
import numpy as np
import matplotlib.pyplot as plt
import scipy

x0, y0 = 2000, 300
x1, y1 = 3000, 2300
num = int(round(np.hypot(x1-x0, y1-y0)))
xx, yy = np.linspace(x0, x1, num), np.linspace(y0, y1, num)

# OpenCVではBGRの順で読み込まれる
dst_cv = cv2.imread(r"C:\Windows\Web\Wallpaper\Windows\img0.jpg",-1) 
# BGR→RGB変換
dst_rgb = cv2.cvtColor(dst_cv, cv2.COLOR_BGR2RGB)
# GRAYスケールで読み込む
dst_gray = cv2.imread(r"C:\Windows\Web\Wallpaper\Windows\img0.jpg",0)
plt.imshow(dst_rgb)
plt.plot([x0, x1], [y0, y1], 'ro-',linewidth = 1, label='width=1')
plt.show()

# GRAYスケール画像でしか機能しない。座標指定順がYY→XXなのに注意
profile = scipy.ndimage.map_coordinates(dst_gray, np.vstack((yy,xx))) 
plt.plot(profile)
plt.show()

著作権が気になるのでプロファイルのプロットのみ掲載する。

docs.scipy.org

scipy.ndimage.map_coordinates のデフォルトは order=3なので、スプラインを3次関数でフィッティングしている。つまり、Cubic補間である。

単なる線形補間にしたい場合は、order=1 を指定すれば良い。

cv2.splitを使って色を分割すれば、各色ごとのプロファイルも取得できる。

Python: OpenCVでMJPGを表示、tkinter でフレームレートを表示させる、録画する

素直にMJPG形式で配信される動画をOpenCVで表示させる

import cv2
 
url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url) 
 
while(True):
    try:
        ret, frame = cap.read()
        if ret:
            cv2.imshow("Video", frame)
            cv2.waitKey(1)
        else:raise
 
    except:
        cap.release()
        cv2.destroyAllWindows()
        break

次にフレームレートを表示するため、tkinterの無限ループを実装する。

移動平均でフレームレートを平滑化していること、signal で KeyboardInterrupt をキャッチさせている。

import tkinter as tk
import time
import numpy as np
import signal

current_time = 0
window_length = 200
nframe = 0
times = [1 for _ in range(window_length)]

root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)

def handler(event):
    root.destroy()
    
def update_gui():
    global current_time, nframe
    times[nframe%window_length] = time.time() - current_time
    current_time = time.time()
    nframe+=1
    try:label.config(text=f"{1/np.mean(times):.0f} Hz")
    except:return
    
    root.after(1, update_gui)

    
signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
update_gui()
root.mainloop()

組み合わせてみよう。

import cv2
import tkinter as tk
import numpy as np
import time
import signal

current_time = 0
window_length = 20
nframe = 0
times = [1 for _ in range(window_length)]

root = tk.Tk()
root.title("Frame rate")
label = tk.Label(root, font=('Helvetica', 48))
label.pack(padx=20, pady=20)

def handler(event):
    root.destroy()
    cv2.destroyAllWindows()

def close_event():
    handler(None)

def show_frame():
    global current_time, nframe
    ret, frame = cap.read()
    if ret:
        cv2.imshow("Video", frame)
        cv2.waitKey(1)

        times[nframe%window_length] = time.time() - current_time
        current_time = time.time()
        nframe+=1
        try:label.config(text=f"{1/np.mean(times):.1f} Hz")
        except:
            cap.release()
            cv2.destroyAllWindows()
            return
        
    root.after(1, show_frame)


url = "http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url)

signal.signal(signal.SIGINT, lambda x,y : print('terminal ^C') or handler(None))
show_frame()
root.protocol("WM_DELETE_WINDOW", close_event)
root.mainloop()

PNGファイルで録画する。なお、1枚ずつ出力するとファイル数が膨大になるため、Nフレーム毎にSPNG形式で出力するコードとしている。フレームを落とさないように、出力部分はThreadPoolExecutorで非同期実行にしている。

import cv2
import time
import datetime
import copy
from concurrent.futures import ThreadPoolExecutor

url = f"http://192.168.0.10/mjpg/video.mjpg"
cap = cv2.VideoCapture(url) 

def write_spng(filename, images):
    import struct
    f = open(filename, "wb")
    for frame in images:
        ret, encoded = cv2.imencode(".png",frame)
        f.write(struct.pack("Q", len(encoded)))
        encoded.tofile(f)
    f.close()

N = 300
images = []
current_time = time.time()
future = None

with ThreadPoolExecutor() as executor:
    while(True):
        ret, frame = cap.read()
        if ret:
            images.append(frame)
            if len(images) == N:
                filename = f"{datetime.datetime.now().strftime('%Y-%m%d_%H%M%S')}_{N/(time.time()-current_time):.3f}Hz.spng"
                current_time = time.time()
                print(future if future is None else future.result(), filename)
                future = executor.submit(write_spng, filename, copy.deepcopy(images))
                del images
                images = []

古いOSへSSH接続時のエラーへの対処

$ ssh -X physics@station
Unable to negotiate with 192.168.0.2 port 22: no matching key exchange method found. Their offer: diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1

と出るときは ~/.ssh/config

Host station
    HostName 192.168.0.2
    User physics
    KexAlgorithms diffie-hellman-group-exchange-sha1,diffie-hellman-group1-sha1

と書く。ちなみに接続元のSSHのバージョン

$ ssh -V
OpenSSH_8.2p1 Ubuntu-4ubuntu0.4, OpenSSL 1.1.1f  31 Mar 2020

Python: 7zip用に既存の圧縮ファイルを分割する

クラウドストレージやアップローダーには、ファイルサイズに上限があることがある。その際、巨大なファイルを分割することが求められる。

巨大なファイルになりがちなのは圧縮ファイルなので、以下、圧縮ファイルを前提として話を進める。圧縮展開プログラムである7zipでは、zipファイルや7zファイルを、.001 .002などの拡張子(ファイル名の末尾)を使って分割することに対応している。GUIでは書庫をサイズで分割オプションから10gを設定、コマンドラインでは-v10g という引数を使ってファイルを10GBごとに分割できるが、既に作成してしまった圧縮ファイルの分割はできない。

以下の例では、既に作ってしまった巨大な圧縮ファイル large_file.7z を、10GBごとに、large_files ディレクトリに分割する例を示す。一時的にメモリ(RAM)に書き込むため、RAMの空き容量が 10GB以上ない場合はエラーが出て実行できない。エラー処理コードは含まない。

import os

def split_file(input_file, output_directory, split_size):
    index = 1
    written_bytes = 0
    file_size = os.path.getsize(input_file)
    print(file_size,"byte")
    with open(input_file, 'rb') as in_f:
        while True:
            output_file = os.path.join(output_directory, f"{os.path.basename(input_file)}.{index:03d}")
            print(output_file)
            copy_bytes = min(split_size, file_size - written_bytes)
            if copy_bytes==0:return
            with open(output_file, 'wb') as out_f:
                out_f.write(in_f.read(copy_bytes))
                written_bytes += copy_bytes
                index += 1

input_file = "large_file.7z"
output_directory = "split_files"
split_size = 10_000_000_000 # 10GB

split_file(input_file, output_directory, split_size)

Windows: GitHubへのログインをTokenを使って自動化する方法

Githubで要ログインなリポジトリにアクセスすると以下のGitHubへのログイン画面が出てくる。

ここで「Sign in with your browser」とやってもログインはできるものの、その度にWindowsに資格情報が追加されてしまいおすすめできない。ちなみにWindowsの資格情報マネージャが記録できる資格情報数には上限があり、上限を超えると色々トラブルが起きる。

それを回避できるのがTokenである。

この Personal access tokenをGitHub上で作ってみよう。

Settings -> Developer Settings へ移動する。Tokens(classic)をクリックし、「Generate new token (classic)」する。 直リンは→https://github.com/settings/tokens/newである。

メモ(なんでもよい)と期限 (1年が最長)を入れ、「repo」にチェックをいれる。

下の方にある「Generate token」とすると、トークンが表示される。ここで表示されるトークンは画面を遷移すると二度と表示できないので確実に記録しておこう。

画像上のトークンは削除ずみです。生成したトークンは、それだけで一定期間、各ユーザーがアクセスできるリポジトリへ自由にアクセスできてしまう。共用や漏洩はしないように気をつけよう。

期限切れになったり自分で削除すると、Gitのpush pullなどでエラーメッセージの後、最初の画面に遷移するので、新しくTokenを作って入力しよう。

Githubの資格情報は、コントロールパネル-->ユーザーアカウント-->資格情報マネージャー-->Windows 資格情報 --> 汎用資格情報 に記録される。アクセスに失敗すると自動的に削除される。