ChatGPT agent example#
Basic example#
ChatGPT can be used in lots of cases, for example, sentiment analysis, language translation, SQL query generation, and text summarization.
This example shows you how to run ChatGPT tasks in flyte.
from typing import List
import flytekit
from flytekit import ImageSpec, Secret, dynamic, task, workflow
from flytekitplugins.openai import ChatGPTTask
You have to specify your name
, openai_organization
and chatgpt_config
.
name
is for Flyte and it should be unique.openai_organization
is for the OpenAI API. You can find it here.chatgpt_config
is for OpenAI chat completion. You can find it here.
chatgpt_small_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
chatgpt_big_job = ChatGPTTask(
name="gpt-4",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-4",
"temperature": 0.7,
},
)
@workflow
def my_chatgpt_job(message: str) -> str:
message = chatgpt_small_job(message=message)
message = chatgpt_big_job(message=message)
return message
You can execute the workflow locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running my_chatgpt_job(message='hi') {my_chatgpt_job(message='hi')}")
ChatGPT summary bot#
These examples show you a real use case of ChatGPT in the production mode.
For more details, see the FlyteChatGPT Summary Bot GitHub repository and the demo video.
Summarize Flyte’s latest GitHub releases to Slack#
image = ImageSpec(
apt_packages=["git"],
packages=[
"flytekitplugins-chatgpt",
"requests",
"slack_sdk",
],
registry="ghcr.io/flyteorg",
)
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@task(
container_image=image,
secret_requests=[Secret(key="token", group="github-api")],
)
def get_github_latest_release(owner: str = "flyteorg", repo: str = "flyte") -> str:
import requests
token = flytekit.current_context().secrets.get("github-api", "token")
url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest"
headers = {
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(url, headers=headers)
message = (
"You are a Bot. Provide a summary of the latest Flyte Github releases for users on Slack."
"Ensure the response fits within 4000 characters, suitable for a Slack message. "
"Start the message with 'These are the latest Flyte GitHub releases'. "
f"End the message with 'Check out the releases page here: https://github.com/{owner}/{repo}/releases'. "
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
f"Latest releases:\n{response.json()['body']}"
)
return message
@task(
container_image=image,
secret_requests=[Secret(key="token", group="slack-api")],
)
def post_message_on_slack(message: str):
if message == "":
return
from slack_sdk import WebClient
token = flytekit.current_context().secrets.get("slack-api", "token")
client = WebClient(token=token)
client.chat_postMessage(channel="youtube-summary", text=message)
@workflow
def slack_wf(owner: str = "flyteorg", repo: str = "flyte", channel: str = "demo"):
message = get_github_latest_release(owner=owner, repo=repo)
message = chatgpt_job(message=message)
post_message_on_slack(message=message)
if __name__ == "__main__":
slack_wf()
Summarize Flyte’s latest YouTube Video to Slack#
image = ImageSpec(
apt_packages=["git"],
packages=[
"flytekitplugins-chatgpt",
"scrapetube==2.5.1",
"youtube_transcript_api==0.6.1",
"slack_sdk==3.23.0",
],
registry="ghcr.io/flyteorg",
)
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@task(container_image=image)
def get_latest_video_transcript_chunks(channel_url: str) -> List[str]:
import scrapetube
from youtube_transcript_api import YouTubeTranscriptApi
# fetch_latest_video_id
video_generator = scrapetube.get_channel(channel_url=channel_url)
latest_video = next(video_generator)
video_id = latest_video["videoId"]
# fetch_transcript
transcript = YouTubeTranscriptApi.get_transcript(video_id)
# chunk_transcript
text_transcript = "\n".join([entry["text"] for entry in transcript])
return [text_transcript[i : i + 10000] for i in range(0, len(text_transcript), 10000)]
@workflow
def video_wf(channel_url: str):
chunks = get_latest_video_transcript_chunks(channel_url=channel_url)
dynamic_subwf(channel_url=channel_url, chunks=chunks)
@task(container_image=image)
def check_strs_len_less_than_num(msg1: str, msg2: str, num: int) -> bool:
return len(msg1) + len(msg2) < num
@task(container_image=image)
def concatenate_str(msg1: str, msg2: str) -> str:
return msg1 + msg2 + "\n"
@task(container_image=image)
def str_is_non_empty(msg: str) -> bool:
return len(msg) == 0
@dynamic(container_image=image)
def dynamic_subwf(channel_url: str, chunks: List[str]):
post_message_on_slack(
message=f"This is the latest video summary, checkout in Flyte's Youtube Channel!\n{channel_url}"
)
summary_messages = []
for chunk in chunks:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a summary of the following portion of the transcript"
" from the latest Flyte YouTube video. Note: This is only a segment"
" of the entire transcript, which has been split into multiple parts."
" The summary should be concise, not exceeding 4000 characters, and"
" suitable for sharing on Slack."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=chunk,
)
)
summary_messages.append(message)
message = ""
for summary_message in summary_messages:
b = check_strs_len_less_than_num(msg1=message, msg2=summary_message, num=15000)
if b.is_true:
message = concatenate_str(msg1=message, msg2=summary_message)
if b.is_false:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a concise summary of the following messages"
" generated by ChatGPT. The summary should be suitable for sharing"
" on Slack and not exceed 4000 characters."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=message,
)
)
post_message_on_slack(message=message)
message = summary_message
b = str_is_non_empty(msg=message)
if b.is_true:
message = chatgpt_job(
message=concatenate_str(
msg1=(
"Please provide a concise summary of the following messages"
" generated by ChatGPT. The summary should be suitable for sharing"
" on Slack and not exceed 4000 characters."
"Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n"
"Transcript:\n"
),
msg2=message,
)
)
post_message_on_slack(message=message)
if __name__ == "__main__":
video_wf(channel_url="https://www.youtube.com/@flyteorg")
Summarize the latest MLOps trend from Medium to Twitter#
Note
This example only works in a local environment.
chatgpt_job = ChatGPTTask(
name="3.5-turbo",
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
chatgpt_config={
"model": "gpt-3.5-turbo",
"temperature": 0.7,
},
)
@task
def get_weekly_articles_title(url: str = "https://medium.com/tag/flyte") -> str:
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
options = Options()
options.add_argument("--headless")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options)
driver.get(url)
page_source = driver.page_source
driver.quit()
soup = BeautifulSoup(page_source, "html.parser")
texts = soup.stripped_strings
all_text = " ".join(texts)
message = (
f"You are a Bot. Provide a summary of the latest MLOps trend for users on Medium. "
f"Your response should fit within 280 characters for a tweet, excluding the article's title. "
f"Start the message with '''This is the trend of MLOps on Medium this week\n'''. Note: Tweet API handling is not required."
f"```````"
f"Article Title: {all_text}"
)
return message
@task(
secret_requests=[
Secret(key="bearer_token", group="tweet-api"),
Secret(key="consumer_key", group="tweet-api"),
Secret(key="consumer_secret", group="tweet-api"),
Secret(key="access_token", group="tweet-api"),
Secret(key="access_token_secret", group="tweet-api"),
],
)
def tweet(text: str):
import tweepy
TWEET_LENGTH = 280
BEARER_TOKEN = flytekit.current_context().secrets.get("tweet-api", "bearer_token")
CONSUMER_KEY = flytekit.current_context().secrets.get("tweet-api", "consumer_key")
CONSUMER_SECRET = flytekit.current_context().secrets.get("tweet-api", "consumer_secret")
ACCESS_TOKEN = flytekit.current_context().secrets.get("tweet-api", "access_token")
ACCESS_TOKEN_SECRET = flytekit.current_context().secrets.get("tweet-api", "access_token_secret")
client = tweepy.Client(
bearer_token=BEARER_TOKEN,
consumer_key=CONSUMER_KEY,
consumer_secret=CONSUMER_SECRET,
access_token=ACCESS_TOKEN,
access_token_secret=ACCESS_TOKEN_SECRET,
)
if len(text) > TWEET_LENGTH:
text = text[:TWEET_LENGTH]
client.create_tweet(text=text)
@workflow
def tweet_wf(url: str = "https://medium.com/tag/flyte"):
message = get_weekly_articles_title(url=url)
message = chatgpt_job(message=message)
tweet(text=message)
if __name__ == "__main__":
tweet_wf()