# This is an example that uses the websockets api to know when a prompt execution is done # Once the prompt execution is done it downloads the images using the /history endpoint import websocket # NOTE: websocket-client (https://github.com/websocket-client/websocket-client) import uuid import json import urllib.request import urllib.parse from PIL import Image import io import random import sys import base64 import requests server_address = "127.0.0.1:8188" client_id = str(uuid.uuid4()) api_path = "https://canvas-api-test.anvil.app/_/api" image_path = ( "D:/Temp/ComfyUI_windows_portable_nvidia/ComfyUI_windows_portable/ComfyUI/output/" ) def update_ai_image_task_status(row_id, new_status): # Define the URL for the API endpoint url = "{}/creation-module/ai-image/update-status".format(api_path) # Create a JSON payload payload = {"row_id": row_id, "new_status": new_status} # Make a POST request to the API endpoint with the JSON payload response = requests.post(url, json=payload) # Handle the response if response.status_code == 200: print("Status update was successful") return response.json() else: print("Status update failed") print("Status code:", response.status_code) print("Response:", response.text) return None def get_ai_image_task(row_id): # Define the URL for the API endpoint url = "{}/creation-module/ai-image/{}".format(api_path, row_id) print("Constructed URL:", url) # Print the URL for debugging # Make a GET request to the API endpoint response = requests.get(url) # Handle the response if response.status_code == 200: print("Request was successful") return response.json() else: print("Request failed") print("Status code:", response.status_code) print("Response:", response.text) return None def find_image_and_convert_to_base64(image_path): with open(image_path, "rb") as image_file: image_data = image_file.read() image_base64 = base64.b64encode(image_data).decode("utf-8") return image_base64 def upload_image_to_anvil(row_id, image_base64): url = "{}/creation-module/ai-image/upload-preview".format(api_path) payload = {"row_id": row_id, "image_base64": image_base64} response = requests.post(url, json=payload) if response.status_code == 200: print("Image uploaded successfully") update_ai_image_task_status(row_id=row_id, new_status=3) return response.json() else: print("Image upload failed") print("Status code:", response.status_code) print("Response:", response.text) return None def load_debug_ai_scene_info(): # open ai_scene_info.json with open( "D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r" ) as f: ai_scene_info = json.load(f) return ai_scene_info def convert_base64_string_to_object(base64_string): bytes = base64.b64decode(base64_string) string = bytes.decode("ascii") return json.loads(string) def set_filename(json_obj, title, new_prefix): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: if "inputs" in value and "filename_prefix" in value["inputs"]: value["inputs"]["filename_prefix"] = new_prefix return new_prefix else: result = set_filename(value, title, new_prefix) if result: return result return None def find_node(json_obj, title): for key, value in json_obj.items(): if isinstance(value, dict): if value.get("_meta", {}).get("title") == title: return value else: result = find_node(value, title) if result: return result return None def queue_prompt(prompt): p = {"prompt": prompt, "client_id": client_id} data = json.dumps(p).encode("utf-8") req = urllib.request.Request("http://{}/prompt".format(server_address), data=data) return json.loads(urllib.request.urlopen(req).read()) def get_prompt(ai_scene_info): with open( "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json", "r", ) as f: prompt_text_json = f.read() prompt = json.loads(prompt_text_json) # set the text prompt for our positive CLIPTextEncode positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"] negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"] base_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//" image_path = base_path + ai_scene_info["project_id"] + "/" image_base_path = image_path + "base0001.jpg" image_alpha_products_path = image_path + "alpha_products0001.jpg" # image_depth_path = image_path + "depth0001.png" prompt = json.loads(prompt_text_json) file_name = set_filename( prompt, "Save Image", "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"]), ) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000) ksampler_main = find_node(prompt, "KSampler") ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"] ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"] prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL") prompt_positive["inputs"]["text_g"] = positive_text prompt_positive["inputs"]["text_l"] = positive_text prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL") prompt_negative["inputs"]["text_g"] = negative_text prompt_negative["inputs"]["text_l"] = negative_text image_base = find_node(prompt, "image_base") image_base["inputs"]["image"] = image_base_path image_base = find_node(prompt, "image_product_mask") image_base["inputs"]["image"] = image_alpha_products_path image_base = find_node(prompt, "image_depth") # image_base["inputs"]["image"] = image_depth_path return prompt def get_image(filename, subfolder, folder_type): data = {"filename": filename, "subfolder": subfolder, "type": folder_type} url_values = urllib.parse.urlencode(data) with urllib.request.urlopen( "http://{}/view?{}".format(server_address, url_values) ) as response: return response.read() def get_history(prompt_id): with urllib.request.urlopen( "http://{}/history/{}".format(server_address, prompt_id) ) as response: return json.loads(response.read()) def get_images(ws, prompt): prompt_id = queue_prompt(prompt)["prompt_id"] output_images = {} while True: out = ws.recv() if isinstance(out, str): message = json.loads(out) if message["type"] == "executing": data = message["data"] if data["node"] is None and data["prompt_id"] == prompt_id: break # Execution is done else: continue # previews are binary data history = get_history(prompt_id)[prompt_id] for node_id in history["outputs"]: node_output = history["outputs"][node_id] images_output = [] if "images" in node_output: for image in node_output["images"]: image_data = get_image( image["filename"], image["subfolder"], image["type"] ) images_output.append( { "filename": image["filename"], "data": image_data, "type": image["type"], } ) output_images[node_id] = images_output return output_images def main(*args): argv = sys.argv try: argv = argv[argv.index("--") + 1 :] ai_scene_info = convert_base64_string_to_object(argv[0]) row_id = ai_scene_info["image_id"] print("loading scene data", ai_scene_info) except Exception as e: print("Error:", e) # ai_scene_info = load_debug_ai_scene_info() prompt = get_prompt(ai_scene_info) ws = websocket.WebSocket() ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id)) update_ai_image_task_status(row_id, 2) images = get_images(ws, prompt) for node_id in images: for image_info in images[node_id]: if image_info["type"] == "output": response = get_ai_image_task(row_id) data = json.loads(response["data"]) project_id = data["project_id"] complete_image_path = image_path + "{}/{}".format( project_id, image_info["filename"] ) print(complete_image_path) image_base64 = find_image_and_convert_to_base64( image_path + "{}/{}".format(project_id, image_info["filename"]) ) upload_image_to_anvil(row_id, image_base64) if __name__ == "__main__": main(sys.argv)