sd_comfy_api_v2.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. # This is an example that uses the websockets api to know when a prompt execution is done
  2. # Once the prompt execution is done it downloads the images using the /history endpoint
  3. import websocket # NOTE: websocket-client (https://github.com/websocket-client/websocket-client)
  4. import uuid
  5. import json
  6. import urllib.request
  7. import urllib.parse
  8. from PIL import Image
  9. import io
  10. import random
  11. import sys
  12. import base64
  13. import requests
  14. server_address = "127.0.0.1:8188"
  15. client_id = str(uuid.uuid4())
  16. api_path = "https://canvas-api-test.anvil.app/_/api"
  17. image_path = (
  18. "D:/Temp/ComfyUI_windows_portable_nvidia/ComfyUI_windows_portable/ComfyUI/output/"
  19. )
  20. def update_ai_image_task_status(row_id, new_status):
  21. # Define the URL for the API endpoint
  22. url = "{}/creation-module/ai-image/update-status".format(api_path)
  23. # Create a JSON payload
  24. payload = {"row_id": row_id, "new_status": new_status}
  25. # Make a POST request to the API endpoint with the JSON payload
  26. response = requests.post(url, json=payload)
  27. # Handle the response
  28. if response.status_code == 200:
  29. print("Status update was successful")
  30. return response.json()
  31. else:
  32. print("Status update failed")
  33. print("Status code:", response.status_code)
  34. print("Response:", response.text)
  35. return None
  36. def get_ai_image_task(row_id):
  37. # Define the URL for the API endpoint
  38. url = "{}/creation-module/ai-image/{}".format(api_path, row_id)
  39. print("Constructed URL:", url) # Print the URL for debugging
  40. # Make a GET request to the API endpoint
  41. response = requests.get(url)
  42. # Handle the response
  43. if response.status_code == 200:
  44. print("Request was successful")
  45. return response.json()
  46. else:
  47. print("Request failed")
  48. print("Status code:", response.status_code)
  49. print("Response:", response.text)
  50. return None
  51. def find_image_and_convert_to_base64(image_path):
  52. with open(image_path, "rb") as image_file:
  53. image_data = image_file.read()
  54. image_base64 = base64.b64encode(image_data).decode("utf-8")
  55. return image_base64
  56. def upload_image_to_anvil(row_id, image_base64):
  57. url = "{}/creation-module/ai-image/upload-preview".format(api_path)
  58. payload = {"row_id": row_id, "image_base64": image_base64}
  59. response = requests.post(url, json=payload)
  60. if response.status_code == 200:
  61. print("Image uploaded successfully")
  62. update_ai_image_task_status(row_id=row_id, new_status=3)
  63. return response.json()
  64. else:
  65. print("Image upload failed")
  66. print("Status code:", response.status_code)
  67. print("Response:", response.text)
  68. return None
  69. def load_debug_ai_scene_info():
  70. # open ai_scene_info.json
  71. with open(
  72. "D:/Git/ap-canvas-creation-module/04_stable_diffusion/ai_scene_info.json", "r"
  73. ) as f:
  74. ai_scene_info = json.load(f)
  75. return ai_scene_info
  76. def convert_base64_string_to_object(base64_string):
  77. bytes = base64.b64decode(base64_string)
  78. string = bytes.decode("ascii")
  79. return json.loads(string)
  80. def set_filename(json_obj, title, new_prefix):
  81. for key, value in json_obj.items():
  82. if isinstance(value, dict):
  83. if value.get("_meta", {}).get("title") == title:
  84. if "inputs" in value and "filename_prefix" in value["inputs"]:
  85. value["inputs"]["filename_prefix"] = new_prefix
  86. return new_prefix
  87. else:
  88. result = set_filename(value, title, new_prefix)
  89. if result:
  90. return result
  91. return None
  92. def find_node(json_obj, title):
  93. for key, value in json_obj.items():
  94. if isinstance(value, dict):
  95. if value.get("_meta", {}).get("title") == title:
  96. return value
  97. else:
  98. result = find_node(value, title)
  99. if result:
  100. return result
  101. return None
  102. def queue_prompt(prompt):
  103. p = {"prompt": prompt, "client_id": client_id}
  104. data = json.dumps(p).encode("utf-8")
  105. req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
  106. return json.loads(urllib.request.urlopen(req).read())
  107. def get_prompt(ai_scene_info):
  108. with open(
  109. "D://Git//ap-canvas-creation-module//04_stable_diffusion//workflows//canvas_3d_to_img_standard_V1.json",
  110. "r",
  111. ) as f:
  112. prompt_text_json = f.read()
  113. prompt = json.loads(prompt_text_json)
  114. # set the text prompt for our positive CLIPTextEncode
  115. positive_text = ai_scene_info["ai_scene"]["prompt"]["positive_prompt"]
  116. negative_text = ai_scene_info["ai_scene"]["prompt"]["negative_prompt"]
  117. base_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Renders//"
  118. image_path = base_path + ai_scene_info["project_id"] + "/"
  119. image_base_path = image_path + "base0001.jpg"
  120. image_alpha_products_path = image_path + "alpha_products0001.jpg"
  121. # image_depth_path = image_path + "depth0001.png"
  122. prompt = json.loads(prompt_text_json)
  123. file_name = set_filename(
  124. prompt,
  125. "Save Image",
  126. "{project_id}/basic_api_example".format(project_id=ai_scene_info["project_id"]),
  127. )
  128. ksampler_main = find_node(prompt, "KSampler")
  129. ksampler_main["inputs"]["noise_seed"] = random.randint(0, 1000000)
  130. ksampler_main = find_node(prompt, "KSampler")
  131. ksampler_main["inputs"]["steps"] = ai_scene_info["ai_scene"]["settings"]["steps"]
  132. ksampler_main["inputs"]["cfg"] = ai_scene_info["ai_scene"]["settings"]["cfg"]
  133. prompt_positive = find_node(prompt, "positive_CLIPTextEncodeSDXL")
  134. prompt_positive["inputs"]["text_g"] = positive_text
  135. prompt_positive["inputs"]["text_l"] = positive_text
  136. prompt_negative = find_node(prompt, "negative_CLIPTextEncodeSDXL")
  137. prompt_negative["inputs"]["text_g"] = negative_text
  138. prompt_negative["inputs"]["text_l"] = negative_text
  139. image_base = find_node(prompt, "image_base")
  140. image_base["inputs"]["image"] = image_base_path
  141. image_base = find_node(prompt, "image_product_mask")
  142. image_base["inputs"]["image"] = image_alpha_products_path
  143. image_base = find_node(prompt, "image_depth")
  144. # image_base["inputs"]["image"] = image_depth_path
  145. return prompt
  146. def get_image(filename, subfolder, folder_type):
  147. data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
  148. url_values = urllib.parse.urlencode(data)
  149. with urllib.request.urlopen(
  150. "http://{}/view?{}".format(server_address, url_values)
  151. ) as response:
  152. return response.read()
  153. def get_history(prompt_id):
  154. with urllib.request.urlopen(
  155. "http://{}/history/{}".format(server_address, prompt_id)
  156. ) as response:
  157. return json.loads(response.read())
  158. def get_images(ws, prompt):
  159. prompt_id = queue_prompt(prompt)["prompt_id"]
  160. output_images = {}
  161. while True:
  162. out = ws.recv()
  163. if isinstance(out, str):
  164. message = json.loads(out)
  165. if message["type"] == "executing":
  166. data = message["data"]
  167. if data["node"] is None and data["prompt_id"] == prompt_id:
  168. break # Execution is done
  169. else:
  170. continue # previews are binary data
  171. history = get_history(prompt_id)[prompt_id]
  172. for node_id in history["outputs"]:
  173. node_output = history["outputs"][node_id]
  174. images_output = []
  175. if "images" in node_output:
  176. for image in node_output["images"]:
  177. image_data = get_image(
  178. image["filename"], image["subfolder"], image["type"]
  179. )
  180. images_output.append(
  181. {
  182. "filename": image["filename"],
  183. "data": image_data,
  184. "type": image["type"],
  185. }
  186. )
  187. output_images[node_id] = images_output
  188. return output_images
  189. def main(*args):
  190. argv = sys.argv
  191. try:
  192. argv = argv[argv.index("--") + 1 :]
  193. ai_scene_info = convert_base64_string_to_object(argv[0])
  194. row_id = ai_scene_info["image_id"]
  195. print("loading scene data", ai_scene_info)
  196. except Exception as e:
  197. print("Error:", e)
  198. # ai_scene_info = load_debug_ai_scene_info()
  199. prompt = get_prompt(ai_scene_info)
  200. ws = websocket.WebSocket()
  201. ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
  202. update_ai_image_task_status(row_id, 2)
  203. images = get_images(ws, prompt)
  204. for node_id in images:
  205. for image_info in images[node_id]:
  206. if image_info["type"] == "output":
  207. response = get_ai_image_task(row_id)
  208. data = json.loads(response["data"])
  209. project_id = data["project_id"]
  210. complete_image_path = image_path + "{}/{}".format(
  211. project_id, image_info["filename"]
  212. )
  213. print(complete_image_path)
  214. image_base64 = find_image_and_convert_to_base64(
  215. image_path + "{}/{}".format(project_id, image_info["filename"])
  216. )
  217. upload_image_to_anvil(row_id, image_base64)
  218. if __name__ == "__main__":
  219. main(sys.argv)