Python发送邮件
import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.mime.image import MIMEImage from email.utils import formataddr from email.header import Header from configparser import ConfigParser def send_message(receiver_email, subject, content, pic_path=[]): global server config = ConfigParser() config.read('config.ini',encoding='utf-8') sender_email = config.get('Settings', 'send_email') auth_code = config.get('Settings', 'code') smtp_server = config.get('Settings', 'smtp_server') smtp_port = config.get('Settings', 'smtp_port') sender_name = config.get('Settings', 'sender_name') receiver_name = config.get('Settings', 'receiver_name') try: if len(pic_path) == 0: # 创建邮件对象 msg = MIMEText(content, 'plain', 'utf-8') # 邮件内容,plain表示纯文本 msg['From'] = formataddr([sender_name, sender_email]) # 发件人信息 msg['To'] = formataddr([receiver_name, receiver_email]) # 收件人信息 msg['Subject'] = Header(subject, 'utf-8') # 邮件标题 # 使用SSL连接到SMTP服务器 server = smtplib.SMTP_SSL(smtp_server, smtp_port) # 登录QQ邮箱 server.login(sender_email, auth_code) # 发送邮件 server.sendmail(sender_email, [receiver_email], msg.as_string()) print("邮件发送成功") else: # 创建邮件对象 msg = MIMEMultipart() msg['From'] = formataddr([sender_name, sender_email]) # 发件人信息 msg['To'] = formataddr([receiver_name, receiver_email]) # 收件人信息 msg['Subject'] = Header(subject, 'utf-8') # 邮件标题 # 添加邮件正文 msg.attach(MIMEText(content, 'plain', 'utf-8')) # # 打开图片文件并将其附加为图片附件 # with open(pic_path, 'rb') as img_file: # img = MIMEImage(img_file.read()) # img.add_header('Content-Disposition', 'attachment', filename=pic_path.split('/')[-1]) # msg.attach(img) for image_path in pic_path: with open(image_path, 'rb') as img_file: img = MIMEImage(img_file.read()) img.add_header('Content-Disposition', 'attachment', filename=image_path.split('/')[-1]) msg.attach(img) # 使用SSL连接到SMTP服务器 server = smtplib.SMTP_SSL(smtp_server, smtp_port) # 登录QQ邮箱 server.login(sender_email, auth_code) # 发送邮件 server.sendmail(sender_email, [receiver_email], msg.as_string()) print("图片邮件发送成功") except Exception as e: print(f"Error occurred: {e}") finally: # 关闭服务器连接 server.quit() file_path = ["./pic/吗喽.jpg", "./pic/tree.png"] send_message(receiver_email='[email protected]', subject="1", content="222", pic_path=file_path)
识别图片文字并计算坐标
from PIL import Image from cnocr import CnOcr # 打开原图并裁剪 image = Image.open('img/test001.png') select_name = "Nic" # 裁剪区域 crop_area = (0, 603, 1211, 2526) cropped_image = image.crop(crop_area) # OCR识别获取数据 ocr = CnOcr() # 使用默认OCR设置 out = ocr.ocr(cropped_image) for text_cl in out: if select_name in text_cl["text"]: point_data = text_cl["position"] x_point = (point_data[0][0] + point_data[2][0]) // 2 y_point = (point_data[0][1] + point_data[2][1]) // 2 # 缩放回原图中的坐标 x_abs = int(x_point + crop_area[0]) y_abs = int(y_point + crop_area[1]) print("原图中的绝对坐标:", x_abs, y_abs)
水印
(1)图片水印
import cv2 import numpy as np img = cv2.imread("img/img.jpg") cv2.namedWindow("img", 0) cv2.resizeWindow("img", 720, 960) cv2.createTrackbar("x", "img", 0, img.shape[0], lambda x: x) cv2.createTrackbar("y", "img", 0, img.shape[1], lambda x: x) while True: x = cv2.getTrackbarPos("x", "img") y = cv2.getTrackbarPos("y", "img") # 在画布上绘制文本 img_copy = img.copy() text = 'OpenCV' # 文本内容 position = (y, x) # 文本位置 font = cv2.FONT_HERSHEY_TRIPLEX font_scale = 1.5 # 文本大小 color = (255, 255, 255) # 白色文本 thickness = 2 cv2.putText(img_copy, text, position, font, font_scale, color, thickness) cv2.imshow('img', img_copy) k = cv2.waitKey(1) & 0xFF if k == 27: cv2.imwrite("img/watermark.jpg", img_copy) break cv2.destroyAllWindows()
(2)视频水印
import cv2 cap = cv2.VideoCapture("video/test.mp4") FPS_show = True # 当前帧是否展示 cv2.namedWindow("video", 0) # cv2.resizeWindow("video", int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter('video/watermark_output.avi', fourcc, fps, (frame_width, frame_height)) class WaterText: text = None # 文字 position = None # 位置 font = None # 字体 font_scale = None # 文本大小 font_color = None # 颜色 font_thickness = None def __init__(self, img): self.img = img def draw(self): cv2.putText(self.img, WaterText.text, WaterText.position, WaterText.font, WaterText.font_scale, WaterText.font_color, WaterText.font_thickness) # 循环读取视频帧 while True: ret, frame = cap.read() # ret 表示是否成功读取,frame 是帧数据 if not ret: # 读取失败或视频结束 print("Video has ended or cannot be read.") break if FPS_show: cv2.createTrackbar("x", "video", 0, frame.shape[0], lambda x: x) cv2.createTrackbar("y", "video", 0, frame.shape[1], lambda x: x) while True: x = cv2.getTrackbarPos("x", "video") y = cv2.getTrackbarPos("y", "video") video_copy = frame.copy() WaterText.text = "Watermark" WaterText.position = (y, x) WaterText.font = cv2.FONT_HERSHEY_TRIPLEX WaterText.font_scale = 1.5 WaterText.font_color = (255, 255, 255) WaterText.font_thickness = 2 WaterText(video_copy).draw() cv2.imshow('video', video_copy) if cv2.waitKey(1) & 0xFF == 27: # esc break FPS_show = False cv2.destroyWindow("video") # continue # 打开则跳过首帧显示 WaterText(frame).draw() cv2.imshow("video", frame) out.write(frame) if cv2.waitKey(20) & 0xFF == ord('q'): break # 释放资源 cap.release() cv2.destroyAllWindows()
提取视频字幕
""" @description:提取视频字幕 """ from functools import partial import cv2 from cnocr import CnOcr import easyocr from paddleocr import PaddleOCR import pandas as pd import Levenshtein class VideoProcessor: def __init__(self, video_path): self.cap = cv2.VideoCapture(video_path) self.ocr_results = [] # 存储 OCR 识别结果 self.fps_show = True # 当前帧是否展示 self.fps = 1 # 帧数 self.drawing = False # 记录是否正在绘制 self.frame = None # 原始帧 self.temp_frame = None # 临时帧(用于绘制) self.rectangle_data = { 'top_left': (0, 0), 'bottom_right': (0, 0), } self.ocr = CnOcr() frame_count = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) video_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) self.frame_list = [i for i in range(1, frame_count+1, video_fps//3)] @staticmethod def preprocess_image(image): """ 预处理OCR图像,提高识别率 """ if image is None: return None # 转为灰度图 gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # 调整对比度和亮度 gray = cv2.convertScaleAbs(gray, alpha=1.5, beta=20) # 去除噪声 gray = cv2.GaussianBlur(gray, (3, 3), 0) return gray def draw_rectangle(self, event, x, y, flags, param): if event == cv2.EVENT_LBUTTONDOWN: # 重新读取原始帧,清空旧的绘制 self.temp_frame = self.frame.copy() self.drawing = True self.rectangle_data['top_left'] = (x, y) cv2.imshow("videoFirst", self.temp_frame) # 清空旧的矩形 elif event == cv2.EVENT_MOUSEMOVE: if self.drawing: # 重新绘制临时帧上的矩形 self.temp_frame = self.frame.copy() bottom_right = (x, y) cv2.rectangle(self.temp_frame, self.rectangle_data["top_left"], bottom_right, (0, 255, 0), 1) cv2.imshow("videoFirst", self.temp_frame) elif event == cv2.EVENT_LBUTTONUP: # 记录最终矩形,并更新 self.frame self.drawing = False self.rectangle_data['bottom_right'] = (x, y) self.frame = self.frame.copy() # 重新拷贝原始帧,避免叠加 cv2.rectangle(self.temp_frame, self.rectangle_data["top_left"], self.rectangle_data["bottom_right"], (0, 255, 0), 1) cv2.imshow("videoFirst", self.temp_frame) def first_frame(self): # 待优化点:读取第一张有字幕的图片 print("区域框选~~~") ret, self.frame = self.cap.read() if self.fps_show and ret: # 读取第一帧,并创建窗口 cv2.namedWindow("videoFirst") cv2.setMouseCallback("videoFirst", partial(self.draw_rectangle)) self.temp_frame = self.frame.copy() # 复制初始帧,防止绘制干扰原始帧 while True: cv2.imshow("videoFirst", self.temp_frame) # 持续显示当前帧 key = cv2.waitKey(20) & 0xFF if key == 27: # 按 ESC 退出 break self.fps_show = False cv2.destroyWindow("videoFirst") def read_video(self): global out_text print("帧读取~~~") x1, y1 = (min(self.rectangle_data["top_left"][0], self.rectangle_data["bottom_right"][0]), min(self.rectangle_data["top_left"][1], self.rectangle_data["bottom_right"][1])) x2, y2 = (max(self.rectangle_data["top_left"][0], self.rectangle_data["bottom_right"][0]), max(self.rectangle_data["top_left"][1], self.rectangle_data["bottom_right"][1])) # 限制裁剪区域在图片范围内 height, width = self.frame.shape[:2] x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(width, x2), min(height, y2) while True: # ret, fra = self.cap.read() # if not ret: # raise f"视频读取失败或结束,第 {self.fps} 帧" if len(self.frame_list) == 0: break else: self.cap.set(cv2.CAP_PROP_POS_FRAMES, self.frame_list[0]) ret, frame = self.cap.read() # 预处理图像 # frame = VideoProcessor.preprocess_image(fra) # 裁剪图像 cropped_image = frame[y1:y2, x1:x2] if cropped_image.size == 0: print("裁剪区域无效,跳过当前帧") continue # 跳过当前帧 try: ''' # cnocr ocr = CnOcr() # 使用默认OCR设置 out = ocr.ocr_for_single_line(cropped_image) if len(out) == 0: out_text = "" else: out_text = out["text"] ''' ''' # easyocr # 初始化阅读器(支持多语言) reader = easyocr.Reader(['ch_sim']) # 简体中文 # 提取文字 results = reader.readtext(cropped_image) if len(results) > 0: for (bbox, text, confidence) in results: out_text = text if len(results) == 0: out_text = "/" ''' ocr = PaddleOCR(use_angle_cls=True, lang='ch') # 中文 # 识别图片 result = ocr.ocr(cropped_image, cls=True) # 提取文字 if not result or not result[0]: # out_text = "" pass else: out_text = " ".join([line[1][0] for block in result for line in block if line]) print("----:" + out_text) self.ocr_results.append((self.fps, self.fps, out_text)) except Exception as e: print(f"第 {self.fps} 帧OCR处理失败: {e}") self.fps = self.frame_list[0] self.frame_list.pop(0) @staticmethod def convert_seconds_to_hms(seconds): """ 将秒数转换为 hh:mm:ss.mmm 格式 """ hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) seconds = seconds % 60 milliseconds = int((seconds - int(seconds)) * 1000) # 获取毫秒部分 return f"{hours:02}:{minutes:02}:{int(seconds):02}.{milliseconds:03}" def write_result(self): df = pd.DataFrame(self.ocr_results, columns=["FPS_begin", "FPS_end", "Text"]) df["status"] = 0 # 1代表被删除/跳过,0表示保留 video_fps = int(self.cap.get(cv2.CAP_PROP_FPS)) first_fps_time = round(1 / video_fps, 3) end_fps_time = round(1 - (first_fps_time * 29), 3) main_index = 0 next_index = 1 for index in range(0, df.shape[0] - 1): text_1 = df.loc[main_index, "Text"] text_2 = df.loc[next_index, "Text"] distance = Levenshtein.distance(text_1, text_2) similarity = round(1 - distance / max(len(text_1), len(text_2)), 2) if distance <= 2 and similarity >= 0.85: df.loc[main_index, "FPS_end"] = df.loc[next_index, "FPS_end"] df.loc[main_index, "Text"] = df.loc[next_index, "Text"] df.loc[next_index, "status"] = 1 next_index += 1 else: main_index = next_index next_index += 1 result = df[df["status"] == 0] for index, caption in result.iterrows(): beg_second = int(caption["FPS_begin"]) // 30 # 秒数 beg_point = int(caption["FPS_begin"]) % 30 * first_fps_time # 秒小数 end_second = int(caption["FPS_end"]) // 30 end_point = int(caption["FPS_end"]) % 30 * first_fps_time # 将开始时间和结束时间转换为 hh:mm:ss.mmm 格式 result.loc[index, "FPS_begin"] = VideoProcessor.convert_seconds_to_hms(beg_second + beg_point) result.loc[index, "FPS_end"] = VideoProcessor.convert_seconds_to_hms(end_second + end_point) result_caption = result.iloc[:, :-1].values.tolist() with open(file="files/captions.txt", mode="w", encoding="utf-8") as f: for caption in result_caption: f.write(f"{caption[0]},{caption[1]},{caption[2]}\n") def close(self): self.cap.release() def run(): video_processor = VideoProcessor("video/1.mp4") video_processor.first_frame() video_processor.read_video() video_processor.write_result() video_processor.close() cv2.destroyAllWindows() if __name__ == "__main__": try: run() except Exception as e: print(e)
转换音频格式
import os import time import subprocess from shutil import copyfile from tkinter import filedialog, messagebox from wyw.音频格式转换 import tkinter_window from itertools import chain class AudioFormat: def __init__(self): self.return_data = tkinter_window.main() # self.file_name = ".".join(self.return_data['file_path'].split("/")[-1].split('.')[0:-1]) self.file_name = ".".join(os.path.basename(self.return_data['file_path']).split('.')[0:-1]) self.now_time = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime()) self.log = [] def run_result(self, status, input_type, output_type=None): if status == 0: self.log.append(f"success:{input_type}=>{output_type}") else: self.log.append(f"error:{input_type}=>{output_type}") # 提取视频中的音频为wav def mp4_to_wav(self, input_type): if self.return_data["selected_option"] == "option1": cmd = [ "ffmpeg", "-i", self.return_data['file_path'], "-vn", # 去掉视频 "-acodec", "pcm_s16le", f"./finally/{self.now_time}/{self.file_name}.wav" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="wav") elif self.return_data["selected_option"] == "option2": cmd = list(chain( ["ffmpeg", "-i", self.return_data['file_path'], "-vn"], ["-acodec", str(self.return_data['bit_depth'])] if self.return_data.get('bit_depth') else ["-acodec", "pcm_s16le"], ["-ar", str(self.return_data['sampling_rate'])] if self.return_data.get('sampling_rate') else [], ["-ac", str(self.return_data['channels'])] if self.return_data.get('channels') else [], [f"./finally/{self.now_time}/{self.file_name}.wav"] )) result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="wav") # 传入其他格式的音频转成wav无损格式 def audio_to_wav(self, input_type): # 这一个用来统一格式 if self.return_data["selected_option"] == "option1": cmd = [ "ffmpeg", "-i", self.return_data['file_path'], "-acodec", "adpcm_ms", f"./finally/{self.now_time}/{self.file_name}.wav" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="wav") elif self.return_data["selected_option"] == "option2": cmd = list(chain( ["ffmpeg", "-i", self.return_data['file_path']], ["-acodec", str(self.return_data['bit_depth'])] if self.return_data.get('bit_depth') else ["-acodec", "pcm_s16le"], ["-ar", str(self.return_data['sampling_rate'])] if self.return_data.get('sampling_rate') else [], ["-ac", str(self.return_data['channels'])] if self.return_data.get('channels') else [], [f"./finally/{self.now_time}/{self.file_name}.wav"] )) result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="wav") # 整合wav转成其他格式的 def wav_to_audio(self, input_type): self.wav_to_mp3(input_type) self.wav_to_aac(input_type) self.wav_to_opus(input_type) self.wav_to_flac(input_type) self.wav_to_alac(input_type) # 下面几个都是单转 def wav_to_mp3(self, input_type): cmd = [ "ffmpeg", "-i", f"./finally/{self.now_time}/{self.file_name}.wav", "-acodec", "libmp3lame", f"./finally/{self.now_time}/{self.file_name}.mp3" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="MP3") def wav_to_aac(self, input_type): cmd = [ "ffmpeg", "-i", f"./finally/{self.now_time}/{self.file_name}.wav", "-acodec", "aac", f"./finally/{self.now_time}/{self.file_name}.aac" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="aac") def wav_to_opus(self, input_type): cmd = [ "ffmpeg", "-i", f"./finally/{self.now_time}/{self.file_name}.wav", "-acodec", "libopus", f"./finally/{self.now_time}/{self.file_name}.opus" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="opus") def wav_to_flac(self, input_type): cmd = [ "ffmpeg", "-i", f"./finally/{self.now_time}/{self.file_name}.wav", "-acodec", "flac", f"./finally/{self.now_time}/{self.file_name}.flac" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="flac") def wav_to_alac(self, input_type): cmd = [ "ffmpeg", "-i", f"./finally/{self.now_time}/{self.file_name}.wav", "-acodec", "alac", f"./finally/{self.now_time}/{self.file_name}.m4a" ] result = subprocess.run(cmd, shell=True) self.run_result(result.returncode, input_type, output_type="alac") def file_check(self): # 创建文件夹位置 os.makedirs(f"./finally/{self.now_time}/", exist_ok=True) # 判断文件类型 if self.return_data['file_path'].split('.')[-1] in ["mp3", "flac", "aac", "opus", "alac"]: self.audio_to_wav(input_type=self.return_data['file_path'].split('.')[-1]) self.wav_to_audio(input_type=self.return_data['file_path'].split('.')[-1]) elif self.return_data['file_path'].split('.')[-1] == 'wav': if self.return_data['selected_option'] == 'option1': # 移动文件 copyfile(self.return_data['file_path'], f"./finally/{self.now_time}/{self.file_name}.wav") self.wav_to_audio(input_type="wav") else: self.audio_to_wav(input_type="wav") self.wav_to_audio(input_type="wav") elif self.return_data['file_path'].split('.')[-1] in ["mp4", "avi", "mov", "mkv"]: self.mp4_to_wav(input_type=self.return_data['file_path'].split('.')[-1]) self.wav_to_audio(input_type=self.return_data['file_path'].split('.')[-1]) messagebox.showinfo("完成", "\n".join(map(str, self.log))) def main(): audio_format = AudioFormat() audio_format.file_check() if __name__ == '__main__': main()
import os import tkinter as tk from tkinter import filedialog, messagebox from tkinter import ttk class FileUploadApp: def __init__(self, root): self.submit_status = False self.text = None self.submit_button = None self.channel_combobox = None self.channel_label = None self.bit_depth_combobox = None self.bit_depth_label = None self.sampling_rate_combobox = None self.sampling_rate_label = None self.option2_radio = None self.option1_radio = None self.file_path_label = None self.upload_button = None self.root = root self.root.title("音频提取与解码") self.root.geometry("500x400") # 文件路径 self.file_path = None # 返回数据 self.return_data = {} # 选项变量 self.option_var = tk.StringVar(value="option1") # 初始化界面 self.init_ui() def init_ui(self): # 文件上传部分 self.upload_button = tk.Button(self.root, text="上传文件", command=self.upload_file) self.upload_button.pack(pady=10) self.file_path_label = tk.Label(self.root, text="文件路径: ") self.file_path_label.pack() # 单选框部分(初始隐藏) self.option1_radio = tk.Radiobutton( self.root, text="不指定格式", variable=self.option_var, value="option1", command=self.on_option_change ) self.option2_radio = tk.Radiobutton( self.root, text="指定音频格式", variable=self.option_var, value="option2", command=self.on_option_change ) # 下拉框部分(初始隐藏) self.sampling_rate_label = tk.Label(self.root, text="采样率") self.sampling_rate_combobox = ttk.Combobox(self.root, values=["", "16000", "44100", "48000"], state="readonly") self.sampling_rate_combobox.set("") # 默认值 self.bit_depth_label = tk.Label(self.root, text="解码") self.bit_depth_combobox = ttk.Combobox(self.root, values=["pcm_u8", "pcm_s16le", "pcm_s24le", "pcm_f32le", "adpcm_ms"], state="readonly") self.bit_depth_combobox.set("pcm_s16le") # 默认值 self.channel_label = tk.Label(self.root, text="声道") self.channel_combobox = ttk.Combobox(self.root, values=["", "单声道", "立体声"], state="readonly") self.channel_combobox.set("") # 默认值 # 提交按钮 self.submit_button = tk.Button(self.root, text="提交", command=self.submit) self.submit_button.pack(side="bottom") def upload_file(self): """上传文件并显示文件路径""" self.file_path = filedialog.askopenfilename(title="选择文件", filetypes=[("mp3", "*.mp3"), ("mp4", "*.mp4"), ("wav", "*.wav")]) if self.file_path: self.file_path_label.config(text=f"文件路径: {self.file_path}") # 显示选项1和选项2 self.option1_radio.pack() self.option2_radio.pack() def on_option_change(self): """根据选项显示或隐藏下拉框""" if self.option_var.get() == "option2": self.sampling_rate_label.pack() self.sampling_rate_combobox.pack() self.bit_depth_label.pack() self.bit_depth_combobox.pack() self.channel_label.pack() self.channel_combobox.pack() else: self.sampling_rate_label.pack_forget() self.sampling_rate_combobox.pack_forget() self.bit_depth_label.pack_forget() self.bit_depth_combobox.pack_forget() self.channel_label.pack_forget() self.channel_combobox.pack_forget() def submit(self): """提交数据并输出到控制台""" if not self.file_path: messagebox.showwarning("警告", "请先上传文件!") return else: self.text = tk.Label(self.root, text="转码中..........", fg="red", font=('Times', 15, 'bold italic underline')) # 将文本内容放置在主窗口内 self.text.pack() self.root.update() # 这里要刷新一下才显示 # 获取选中的选项 selected_option = self.option_var.get() # 获取下拉框的值(如果选项2被选中) if selected_option == "option2": sampling_rate = self.sampling_rate_combobox.get() bit_depth = self.bit_depth_combobox.get() channel = self.channel_combobox.get() else: sampling_rate = bit_depth = channel = None check_channel = { "单声道": 1, "立体声": 2 } self.return_data = { "file_path": self.file_path, # 文件地址 "selected_option": selected_option, # 选项 "sampling_rate": sampling_rate, # 采样率 "bit_depth": bit_depth, # 解码 "channel": check_channel["channel"] if channel is not None else channel, } self.root.quit() # 这个东西很迷,有时候能触发有时候不能触发,好像是和完成的messagebox有冲突? return self.return_data def main(): # 创建主窗口 root = tk.Tk() file_upload = FileUploadApp(root) root.mainloop() return file_upload.return_data