import requests from pathlib import Path import json import base64 DDL_SERVER = "http://69.230.251.234:8081/api" script_path_ai = ( "D://Git//ap-canvas-creation-module//04_stable_diffusion//sd_comfy_api_v2.py" ) # script_path = Path(__file__).resolve() script_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//" # scene_path = str(script_path.parent / "sample_scene" / "Canvas_Render_Scene.blend") scene_path = "D://Git//ap-canvas-creation-module//03_blender//sd_blender//sample_scene//Canvas_Render_Scene.blend" # render_script_path = str(script_path.parent / "zs_ai_render_script.py") render_script_path = ( "D://Git//ap-canvas-creation-module//03_blender//sd_blender//zs_ai_render_script.py" ) post_job_script_path = "D:/Git/ap-canvas-creation-module/04_stable_diffusion/zs_ai_post_ai_render_script.py" def convert_object_to_base64_string(obj): return base64.b64encode(json.dumps(obj).encode("utf-8")).decode("utf-8") def send_3d_ddl_job( job_info={}, plugin_info={}, ): url = f"{DDL_SERVER}/jobs" response = requests.post( url, json={ "JobInfo": job_info, "PluginInfo": plugin_info, "AuxFiles": [], "IdOnly": True, }, ) if response.status_code == 200: print("Data posted successfully.") return response.json() else: print(f"Failed to post data. Status code: {response.status_code}") return None def send_sd_comfy_ddl_job( job_info={}, plugin_info={}, ): url = f"{DDL_SERVER}/jobs" data = { "JobInfo": job_info, "PluginInfo": plugin_info, "AuxFiles": [], "IdOnly": True, } response = requests.post(url, json=data) if response.status_code == 200: print("Data posted successfully.") return response.json() else: print(f"Failed to post data. Status code: {response.status_code}") return None def get_scene_data(): print("Loading Scene Data") # load scene data # to be replaced with actual data # open scene_info.json script_path = Path(__file__).resolve() scene_data_path = script_path.parent / "sample_scene" / "scene_info.json" with scene_data_path.open() as file: scene_data = json.load(file) print(scene_data) return scene_data def get_ai_scene_data(): print("Loading AI Scene Data") # load scene data # to be replaced with actual data # open scene_info.json script_path = Path(__file__).resolve() scene_data_path = script_path.parent / "sample_scene" / "ai_scene_info.json" with scene_data_path.open() as file: ai_scene_data = json.load(file) print(ai_scene_data) return ai_scene_data def submit_3d_job(username, scene_data): # create 3d ddl job ddl_3d_job = send_3d_ddl_job( job_info={ "Name": scene_data["project_id"], "UserName": username, "Frames": 1, "Priority": 49, "Plugin": "DevBlender", "ChunkSize": "1", "MachineName": "AI_PC", # "PostJobScript": "D://Git//ap-canvas-creation-module//04_stable_diffusion//sd_comfy_api.py", }, plugin_info={ "SceneFile": scene_path, "AdditionalArguments": f"-P {render_script_path}", "CustomArguments": json.dumps(scene_data), }, ) return ddl_3d_job # create ai ddl job def submit_ai_image_job(username, ai_scene_data, dependency_job_id=""): ddl_ai_job = send_sd_comfy_ddl_job( job_info={ "Name": "AI_Test_Job_01", "UserName": username, "Plugin": "Python", "Priority": 49, "Name": "ComfyUI Job", "JobDependencies": dependency_job_id, "PostJobScript": post_job_script_path, }, plugin_info={ "ScriptFile": script_path_ai, "Version": "3.1", "Arguments": f"-- {convert_object_to_base64_string(ai_scene_data)}", }, ) return ddl_ai_job def submit_3d_and_ai_image_job(username, scene_data, ai_scene_data): ddl_3d_job = submit_3d_job(username, scene_data) ddl_ai_job = submit_ai_image_job(username, ai_scene_data, ddl_3d_job["_id"]) return ddl_3d_job, ddl_ai_job def submit_test_job(): scene_data = get_scene_data() ai_scene_data = get_ai_scene_data() # ddl_3d_job = submit_3d_job("test_user", scene_data) ddl_ai_job = submit_ai_image_job("test_user", ai_scene_data) # ddl_3d_job, ddl_ai_job = submit_3d_and_ai_image_job( # "test_user", scene_data, scene_data # ) submit_test_job()