autogpt 还原,开新分支编写
This commit is contained in:
@ -1,10 +1,7 @@
|
||||
""" Image Generation Module for AutoGPT."""
|
||||
import io
|
||||
import json
|
||||
import time
|
||||
import uuid
|
||||
from base64 import b64decode
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import openai
|
||||
import requests
|
||||
@ -12,20 +9,12 @@ from PIL import Image
|
||||
|
||||
from autogpt.commands.command import command
|
||||
from autogpt.config import Config
|
||||
from autogpt.logs import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from autogpt.config import Config
|
||||
CFG = Config()
|
||||
|
||||
|
||||
@command(
|
||||
"generate_image",
|
||||
"Generate Image",
|
||||
'"prompt": "<prompt>"',
|
||||
lambda config: config.image_provider,
|
||||
"Requires a image provider to be set.",
|
||||
)
|
||||
def generate_image(prompt: str, config: Config, size: int = 256) -> str:
|
||||
@command("generate_image", "Generate Image", '"prompt": "<prompt>"', CFG.image_provider)
|
||||
def generate_image(prompt: str, size: int = 256) -> str:
|
||||
"""Generate an image from a prompt.
|
||||
|
||||
Args:
|
||||
@ -35,21 +24,21 @@ def generate_image(prompt: str, config: Config, size: int = 256) -> str:
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
filename = f"{config.workspace_path}/{str(uuid.uuid4())}.jpg"
|
||||
filename = f"{CFG.workspace_path}/{str(uuid.uuid4())}.jpg"
|
||||
|
||||
# DALL-E
|
||||
if config.image_provider == "dalle":
|
||||
return generate_image_with_dalle(prompt, filename, size, config)
|
||||
if CFG.image_provider == "dalle":
|
||||
return generate_image_with_dalle(prompt, filename, size)
|
||||
# HuggingFace
|
||||
elif config.image_provider == "huggingface":
|
||||
return generate_image_with_hf(prompt, filename, config)
|
||||
elif CFG.image_provider == "huggingface":
|
||||
return generate_image_with_hf(prompt, filename)
|
||||
# SD WebUI
|
||||
elif config.image_provider == "sdwebui":
|
||||
return generate_image_with_sd_webui(prompt, filename, config, size)
|
||||
elif CFG.image_provider == "sdwebui":
|
||||
return generate_image_with_sd_webui(prompt, filename, size)
|
||||
return "No Image Provider Set"
|
||||
|
||||
|
||||
def generate_image_with_hf(prompt: str, filename: str, config: Config) -> str:
|
||||
def generate_image_with_hf(prompt: str, filename: str) -> str:
|
||||
"""Generate an image with HuggingFace's API.
|
||||
|
||||
Args:
|
||||
@ -60,58 +49,34 @@ def generate_image_with_hf(prompt: str, filename: str, config: Config) -> str:
|
||||
str: The filename of the image
|
||||
"""
|
||||
API_URL = (
|
||||
f"https://api-inference.huggingface.co/models/{config.huggingface_image_model}"
|
||||
f"https://api-inference.huggingface.co/models/{CFG.huggingface_image_model}"
|
||||
)
|
||||
if config.huggingface_api_token is None:
|
||||
if CFG.huggingface_api_token is None:
|
||||
raise ValueError(
|
||||
"You need to set your Hugging Face API token in the config file."
|
||||
)
|
||||
headers = {
|
||||
"Authorization": f"Bearer {config.huggingface_api_token}",
|
||||
"Authorization": f"Bearer {CFG.huggingface_api_token}",
|
||||
"X-Use-Cache": "false",
|
||||
}
|
||||
|
||||
retry_count = 0
|
||||
while retry_count < 10:
|
||||
response = requests.post(
|
||||
API_URL,
|
||||
headers=headers,
|
||||
json={
|
||||
"inputs": prompt,
|
||||
},
|
||||
)
|
||||
response = requests.post(
|
||||
API_URL,
|
||||
headers=headers,
|
||||
json={
|
||||
"inputs": prompt,
|
||||
},
|
||||
)
|
||||
|
||||
if response.ok:
|
||||
try:
|
||||
image = Image.open(io.BytesIO(response.content))
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
image.save(filename)
|
||||
return f"Saved to disk:{filename}"
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
break
|
||||
else:
|
||||
try:
|
||||
error = json.loads(response.text)
|
||||
if "estimated_time" in error:
|
||||
delay = error["estimated_time"]
|
||||
logger.debug(response.text)
|
||||
logger.info("Retrying in", delay)
|
||||
time.sleep(delay)
|
||||
else:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
break
|
||||
image = Image.open(io.BytesIO(response.content))
|
||||
print(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
retry_count += 1
|
||||
image.save(filename)
|
||||
|
||||
return f"Error creating image."
|
||||
return f"Saved to disk:{filename}"
|
||||
|
||||
|
||||
def generate_image_with_dalle(
|
||||
prompt: str, filename: str, size: int, config: Config
|
||||
) -> str:
|
||||
def generate_image_with_dalle(prompt: str, filename: str, size: int) -> str:
|
||||
"""Generate an image with DALL-E.
|
||||
|
||||
Args:
|
||||
@ -122,11 +87,12 @@ def generate_image_with_dalle(
|
||||
Returns:
|
||||
str: The filename of the image
|
||||
"""
|
||||
openai.api_key = CFG.openai_api_key
|
||||
|
||||
# Check for supported image sizes
|
||||
if size not in [256, 512, 1024]:
|
||||
closest = min([256, 512, 1024], key=lambda x: abs(x - size))
|
||||
logger.info(
|
||||
print(
|
||||
f"DALL-E only supports image sizes of 256x256, 512x512, or 1024x1024. Setting to {closest}, was {size}."
|
||||
)
|
||||
size = closest
|
||||
@ -136,10 +102,9 @@ def generate_image_with_dalle(
|
||||
n=1,
|
||||
size=f"{size}x{size}",
|
||||
response_format="b64_json",
|
||||
api_key=config.openai_api_key,
|
||||
)
|
||||
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
print(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
image_data = b64decode(response["data"][0]["b64_json"])
|
||||
|
||||
@ -152,7 +117,6 @@ def generate_image_with_dalle(
|
||||
def generate_image_with_sd_webui(
|
||||
prompt: str,
|
||||
filename: str,
|
||||
config: Config,
|
||||
size: int = 512,
|
||||
negative_prompt: str = "",
|
||||
extra: dict = {},
|
||||
@ -169,13 +133,13 @@ def generate_image_with_sd_webui(
|
||||
"""
|
||||
# Create a session and set the basic auth if needed
|
||||
s = requests.Session()
|
||||
if config.sd_webui_auth:
|
||||
username, password = config.sd_webui_auth.split(":")
|
||||
if CFG.sd_webui_auth:
|
||||
username, password = CFG.sd_webui_auth.split(":")
|
||||
s.auth = (username, password or "")
|
||||
|
||||
# Generate the images
|
||||
response = requests.post(
|
||||
f"{config.sd_webui_url}/sdapi/v1/txt2img",
|
||||
f"{CFG.sd_webui_url}/sdapi/v1/txt2img",
|
||||
json={
|
||||
"prompt": prompt,
|
||||
"negative_prompt": negative_prompt,
|
||||
@ -189,7 +153,7 @@ def generate_image_with_sd_webui(
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(f"Image Generated for prompt:{prompt}")
|
||||
print(f"Image Generated for prompt:{prompt}")
|
||||
|
||||
# Save the image to disk
|
||||
response = response.json()
|
||||
|
||||
Reference in New Issue
Block a user