Python + ComfyUI:打造自动化AI绘画工作流

简介

ComfyUI 是一个功能强大的AI绘画工具,能够帮助用户快速生成高质量的图像。然而,在实际应用中,用户往往需要在工作流执行前后对输入图像进行预处理或后处理,例如调整尺寸、添加滤镜或进行格式转换。此外,用户可能希望自动从ChatGPT获取创意提示词来生成更具个性化的图像,或者将输出的图片按照特定的文件夹结构进行整理,以便于管理和检索。或者,用户可能希望将ComfyUI与聊天软件集成,通过简单的对话指令触发图像生成流程。凭借Python的灵活性和丰富的生态系统,用户可以轻松实现这些功能,打造高度自动化、智能化的AI绘画工作流,满足多样化的创作需求。

使用说明

  1. ComfyUI不限定版本,笔者使用的是秋叶启动器2.8.12;

  2. 保证你使用的工作流能够正常出图,python并不解决无模型、内存不足等问题;

  3. 将ComfyUI切换到开发者模式,保存工作流的API文件

代码

整体框架(图生图):

  • 上传图片到ComfyUI服务器,返回图片路径;

  • 在工作流中填充图片路径和随机种子;

  • 上传工作流并获取返回的任务id;

  • 等待任务结束,根据任务id拿到历史数据;

  • 根据历史数据拿到图片并保存。

上传图片

def upload_image(filepath):
    filename = os.path.basename(filepath)
    new_filepath = upload_folder + filename
    with open(filepath, "rb") as image_file:
        image_binary_data = image_file.read()
        files = {'image': (new_filepath, image_binary_data, 'image/png')}
        response = requests.post(f"http://{server_address}/upload/image", files=files)
    if response.status_code == 200:
        return new_filepath

upload_image接收图片的路径,然后将图片上传到ComfyUI的upload_folder文件夹下,接着返回了上传图片的路径。

解析并上传工作流

def queue_prompt(prompt):
    p = {"prompt": prompt, "client_id": client_id}
    data = json.dumps(p).encode('utf-8')
    req = urllib.request.Request(f"http://{server_address}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())

def parse_workflow(filepath, workflow):
    with open(workflow, 'r', encoding="utf-8") as workflow_api:
        prompt = json.load(workflow_api)
        # 设置图片输入
        for node in input_nodes:
            prompt[str(node)]["inputs"]["image"] = filepath
        # 设置噪声
        for node in noise_nodes:
            new_seed = generate_seed()
            if "noise_seed" in prompt[str(node)]["inputs"]:
                prompt[str(node)]["inputs"]["noise_seed"] = new_seed
            elif "seed" in prompt[str(node)]["inputs"]:
                prompt[str(node)]["inputs"]["seed"] = new_seed
            else:
                pass
        # 获取任务id
        prompt_id = queue_prompt(prompt)['prompt_id']
        return get_images(prompt_id)

parse_workflow将图片路径和随机种子填充到工作流中。input_nodes为输入节点列表、noise_nodes为噪声节点列表。每个工作流的input_nodes和noise_nodes都不同,需要自行查找,方法如下:

(关键步骤)在ComfyUI的web界面中找到工作流的输入节点,节点序号一般在右上角,添加到input_nodes中;

(关键步骤)在API文件中搜索seed关键字,如果有noise_seed或seed字段,就将这个节点序号添加到noise_nodes中。

获取历史数据并解析图片

def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{server_address}/history/{prompt_id}") as response:
        return json.loads(response.read())

def get_images(prompt_id):
    ws = websocket.WebSocket()
    ws.connect(f"ws://{server_address}/ws?clientId={client_id}")
    logger.info('prompt_id:{}'.format(prompt_id))
    output_images = {}
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            if message['type'] == 'executing':
                data = message['data']
                if data['node'] is None and data['prompt_id'] == prompt_id:
                    logger.info(f"prompt_id:{prompt_id} 任务已完成")
                    break
        else:
            continue  # 预览为二进制数据

    history = get_history(prompt_id)[prompt_id]
    for o in history['outputs']:
        if o in output_nodes:
            images = history['outputs'][o]['images']
            images_output = []
            for image in images:
                image_data = get_image(image['filename'], image['subfolder'], image['type'])
                images_output.append(image_data)
            output_images[o] = images_output
    logger.info(f"prompt_id:{prompt_id} 获取图片成功")
    return output_images

get_images打开了一个websocket连接,持续监听ComfyUI的任务完成情况,如果任务已经完成,就获取历史数据并按照给定的output_nodes获取输出图片。

(关键步骤)在ComfyUI的web界面中找到工作流的输出节点,节点序号一般在右上角,添加到output_nodes中;

完整实现

import json
import os
import random
import requests
import websocket
import uuid
import urllib.request
import urllib.parse
from loguru import logger


def generate_seed():
    return random.randint(10 ** 14, 10 ** 16 - 1)


# 定义一个函数向服务器队列发送提示信息
def queue_prompt(prompt):
    p = {"prompt": prompt, "client_id": client_id}
    data = json.dumps(p).encode('utf-8')
    req = urllib.request.Request(f"http://{server_address}/prompt", data=data)
    return json.loads(urllib.request.urlopen(req).read())


# 定义一个函数来获取图片
def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen(f"http://{server_address}/view?{url_values}") as response:
        return response.read()


def upload_image(filepath):
    filename = os.path.basename(filepath)
    new_filepath = upload_folder + filename
    with open(filepath, "rb") as image_file:
        image_binary_data = image_file.read()
        files = {'image': (new_filepath, image_binary_data, 'image/png')}
        response = requests.post(f"http://{server_address}/upload/image", files=files)
    if response.status_code == 200:
        return new_filepath


# 定义一个函数来获取历史记录
def get_history(prompt_id):
    with urllib.request.urlopen(f"http://{server_address}/history/{prompt_id}") as response:
        return json.loads(response.read())


# 定义一个函数来获取图片,这涉及到监听WebSocket消息
def get_images(prompt_id):
    ws = websocket.WebSocket()
    ws.connect(f"ws://{server_address}/ws?clientId={client_id}")
    logger.info('prompt_id:{}'.format(prompt_id))
    output_images = {}
    while True:
        out = ws.recv()
        if isinstance(out, str):
            message = json.loads(out)
            if message['type'] == 'executing':
                data = message['data']
                if data['node'] is None and data['prompt_id'] == prompt_id:
                    logger.info(f"prompt_id:{prompt_id} 任务已完成")
                    break
        else:
            continue  # 预览为二进制数据

    history = get_history(prompt_id)[prompt_id]
    for o in history['outputs']:
        if o in output_nodes:
            images = history['outputs'][o]['images']
            images_output = []
            for image in images:
                image_data = get_image(image['filename'], image['subfolder'], image['type'])
                images_output.append(image_data)
            output_images[o] = images_output
    logger.info(f"prompt_id:{prompt_id} 获取图片成功")
    return output_images


# 解析工作流并获取图片
def parse_workflow(filepath, workflow):
    with open(workflow, 'r', encoding="utf-8") as workflow_api:
        prompt = json.load(workflow_api)
        # 设置图片输入
        for node in input_nodes:
            prompt[str(node)]["inputs"]["image"] = filepath
        # 设置噪声
        for node in noise_nodes:
            new_seed = generate_seed()
            if "noise_seed" in prompt[str(node)]["inputs"]:
                prompt[str(node)]["inputs"]["noise_seed"] = new_seed
            elif "seed" in prompt[str(node)]["inputs"]:
                prompt[str(node)]["inputs"]["seed"] = new_seed
            else:
                pass
        # 获取任务id
        prompt_id = queue_prompt(prompt)['prompt_id']
        return get_images(prompt_id)


# 生成图像并显示
def generate(workflow, upload_path):
    filename = os.path.basename(upload_path)
    filepath = upload_image(upload_path)
    images = parse_workflow(filepath, workflow)

    for node_id in images:
        idx = 1
        for image_data in images[node_id]:
            from datetime import datetime
            timestamp = datetime.now().strftime("%m%d%H%M%S")

            # 使用格式化的时间戳在文件名中
            output_img_path = f"{output_folder}{node_id}_{filename}_{idx}_{timestamp}.jpg"

            with open(output_img_path, "wb") as binary_file:
                binary_file.write(image_data)

            idx += 1
            logger.info(f"{output_img_path} 保存完毕")


def process_images(input_folder, workflow):
    for filename in os.listdir(input_folder):
        filepath = os.path.join(input_folder, filename)
        if os.path.isfile(filepath) and filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
            for i in range(repeat_times):
                generate(workflow, filepath)


if __name__ == "__main__":
    # 输入输出文件夹
    upload_folder = "./upload/"
    output_folder = "./output/"
    input_folder = "./input/"
    # ComfyUI地址
    COMFYUI_ENDPOINT = '127.0.0.1:8188'
    server_address = COMFYUI_ENDPOINT
    # 工作流API文件
    workflow = '动漫转真人.json'
    # 输入输出节点
    input_nodes = ['112']
    output_nodes = ['94']
    noise_nodes = ['63', '65', '83']
    # 其他设置
    repeat_times = 1

    if not os.path.exists(output_folder):
        os.makedirs(output_folder)
    if not os.path.exists(input_folder):
        os.makedirs(input_folder)
    client_id = str(uuid.uuid4())
    process_images(input_folder, workflow)
    logger.info("任务已完成")

再次提醒:

  1. 这是一份用于图生图的代码,如果要实现文生图或者其他功能,请自行修改;

  2. 代码中需要自行修改COMFYUI_ENDPOINT、input_nodes、output_nodes和noise_nodes;

  3. 输入的文件放到input文件夹下,输出的文件会保存到output文件夹中;

参考文献

使用python调用comfyui-api,实现出图自由 - 知乎

【ComfyUI的API接口调用示例】_comfyui api调用例子-CSDN博客


Python + ComfyUI:打造自动化AI绘画工作流
https://blog.nasxyz.top/archives/4564e3cb-f261-4aa1-8fa4-db1782ffbdc6
作者
kanochi
发布于
2025年02月15日
更新于
2025年02月15日
许可协议