123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- # This is an example that uses the websockets api to know when a prompt execution is done
- # Once the prompt execution is done it downloads the images using the /history endpoint
- import websocket # NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
- import uuid
- import json
- import urllib.request
- import urllib.parse
- from PIL import Image
- import io
- import random
- import sys
- import base64
- import requests
- server_address = "127.0.0.1:8188"
- client_id = str(uuid.uuid4())
- api_path = "https://canvas-api-test.anvil.app/_/api"
- image_path = (
- "D:/Temp/ComfyUI_windows_portable_nvidia/ComfyUI_windows_portable/ComfyUI/output/"
- )
- def update_ai_image_task_status(row_id, new_status):
- # Define the URL for the API endpoint
- url = "{}/creation-module/ai-image/update-status".format(api_path)
- # Create a JSON payload
- payload = {"row_id": row_id, "new_status": new_status}
- # Make a POST request to the API endpoint with the JSON payload
- response = requests.post(url, json=payload)
- # Handle the response
- if response.status_code == 200:
- print("Status update was successful")
- return response.json()
- else:
- print("Status update failed")
- print("Status code:", response.status_code)
- print("Response:", response.text)
- return None
- def get_ai_image_task(row_id):
- # Define the URL for the API endpoint
- url = "{}/creation-module/ai-image/{}".format(api_path, row_id)
- print("Constructed URL:", url) # Print the URL for debugging
- # Make a GET request to the API endpoint
- response = requests.get(url)
- # Handle the response
- if response.status_code == 200:
- print("Request was successful")
- return response.json()
- else:
- print("Request failed")
- print("Status code:", response.status_code)
- print("Response:", response.text)
- return None
- def find_image_and_convert_to_base64(image_path):
- with open(image_path, "rb") as image_file:
- image_data = image_file.read()
- image_base64 = base64.b64encode(image_data).decode("utf-8")
- return image_base64
- def upload_image_to_anvil(row_id, image_base64):
- url = "{}/creation-module/ai-image/upload-preview".format(api_path)
- payload = {"row_id": row_id, "image_base64": image_base64}
- response = requests.post(url, json=payload)
- if response.status_code == 200:
- print("Image uploaded successfully")
- update_ai_image_task_status(row_id=row_id, new_status=3)
- return response.json()
- else:
- print("Image upload failed")
- print("Status code:", response.status_code)
- print("Response:", response.text)
- return None
- def load_debug_ai_scene_info():
- # open ai_scene_info.json
- with open(
- "D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r"
- ) as f:
- ai_scene_info = json.load(f)
- return ai_scene_info
- def convert_base64_string_to_object(base64_string):
- bytes = base64.b64decode(base64_string)
- string = bytes.decode("ascii")
- return json.loads(string)
- def set_filename(json_obj, title, new_prefix):
- for key, value in json_obj.items():
- if isinstance(value, dict):
- if value.get("_meta", {}).get("title") == title:
- if "inputs" in value and "filename_prefix" in value["inputs"]:
- value["inputs"]["filename_prefix"] = new_prefix
- return new_prefix
- else:
- result = set_filename(value, title, new_prefix)
- if result:
- return result
- return None
- def find_node(json_obj, title):
- for key, value in json_obj.items():
- if isinstance(value, dict):
- if value.get("_meta", {}).get("title") == title:
- return value
- else:
- result = find_node(value, title)
- if result:
- return result
- return None
- def queue_prompt(prompt):
- p = {"prompt": prompt, "client_id": client_id}
- data = json.dumps(p).encode("utf-8")
- req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
- return json.loads(urllib.request.urlopen(req).read())
- def get_prompt(ai_scene_info):
- with open(
- "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json",
- "r",
- ) as f:
- prompt_text_json = f.read()
- prompt = json.loads(prompt_text_json)
- # set the text prompt for our positive CLIPTextEncode
- positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"]
- negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"]
- base_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//"
- image_path = base_path + ai_scene_info["project_id"] + "/"
- image_base_path = image_path + "base0001.jpg"
- image_alpha_products_path = image_path + "alpha_products0001.jpg"
- # image_depth_path = image_path + "depth0001.png"
- prompt = json.loads(prompt_text_json)
- file_name = set_filename(
- prompt,
- "Save Image",
- "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"]),
- )
- ksampler_main = find_node(prompt, "KSampler")
- ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000)
- ksampler_main = find_node(prompt, "KSampler")
- ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"]
- ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"]
- prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL")
- prompt_positive["inputs"]["text_g"] = positive_text
- prompt_positive["inputs"]["text_l"] = positive_text
- prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL")
- prompt_negative["inputs"]["text_g"] = negative_text
- prompt_negative["inputs"]["text_l"] = negative_text
- image_base = find_node(prompt, "image_base")
- image_base["inputs"]["image"] = image_base_path
- image_base = find_node(prompt, "image_product_mask")
- image_base["inputs"]["image"] = image_alpha_products_path
- image_base = find_node(prompt, "image_depth")
- # image_base["inputs"]["image"] = image_depth_path
- return prompt
- def get_image(filename, subfolder, folder_type):
- data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
- url_values = urllib.parse.urlencode(data)
- with urllib.request.urlopen(
- "http://{}/view?{}".format(server_address, url_values)
- ) as response:
- return response.read()
- def get_history(prompt_id):
- with urllib.request.urlopen(
- "http://{}/history/{}".format(server_address, prompt_id)
- ) as response:
- return json.loads(response.read())
- def get_images(ws, prompt):
- prompt_id = queue_prompt(prompt)["prompt_id"]
- output_images = {}
- while True:
- out = ws.recv()
- if isinstance(out, str):
- message = json.loads(out)
- if message["type"] == "executing":
- data = message["data"]
- if data["node"] is None and data["prompt_id"] == prompt_id:
- break # Execution is done
- else:
- continue # previews are binary data
- history = get_history(prompt_id)[prompt_id]
- for node_id in history["outputs"]:
- node_output = history["outputs"][node_id]
- images_output = []
- if "images" in node_output:
- for image in node_output["images"]:
- image_data = get_image(
- image["filename"], image["subfolder"], image["type"]
- )
- images_output.append(
- {
- "filename": image["filename"],
- "data": image_data,
- "type": image["type"],
- }
- )
- output_images[node_id] = images_output
- return output_images
- def main(*args):
- argv = sys.argv
- try:
- argv = argv[argv.index("--") + 1 :]
- ai_scene_info = convert_base64_string_to_object(argv[0])
- row_id = ai_scene_info["image_id"]
- print("loading scene data", ai_scene_info)
- except Exception as e:
- print("Error:", e)
- # ai_scene_info = load_debug_ai_scene_info()
- prompt = get_prompt(ai_scene_info)
- ws = websocket.WebSocket()
- ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
- update_ai_image_task_status(row_id, 2)
- images = get_images(ws, prompt)
- for node_id in images:
- for image_info in images[node_id]:
- if image_info["type"] == "output":
- response = get_ai_image_task(row_id)
- data = json.loads(response["data"])
- project_id = data["project_id"]
- complete_image_path = image_path + "{}/{}".format(
- project_id, image_info["filename"]
- )
- print(complete_image_path)
- image_base64 = find_image_and_convert_to_base64(
- image_path + "{}/{}".format(project_id, image_info["filename"])
- )
- upload_image_to_anvil(row_id, image_base64)
- if __name__ == "__main__":
- main(sys.argv)
|