Skip to content

Commit 26af761

Browse files
authored
[Flytekit] Add custom agent template in Pyflyte Init (#51)
* feat: add template for custom agent build Signed-off-by: mao3267 <[email protected]> * fix: directory tree Signed-off-by: mao3267 <[email protected]> * fix: update sync agent openai Signed-off-by: mao3267 <[email protected]> * fix: rename directory Signed-off-by: mao3267 <[email protected]> * fix: update plugin install source Signed-off-by: mao3267 <[email protected]> * fix: copy cmd error Signed-off-by: mao3267 <[email protected]> * fix: use template from Future-Outlier/flyte-custom-agent-template Signed-off-by: mao3267 <[email protected]> * fix: standardize getopts parameters Signed-off-by: mao3267 <[email protected]> * fix: add newline Signed-off-by: mao3267 <[email protected]> --------- Signed-off-by: mao3267 <[email protected]>
1 parent 8a6b3a1 commit 26af761

File tree

17 files changed

+528
-3
lines changed

17 files changed

+528
-3
lines changed

basic-custom-agent/cookiecutter.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"project_name": "Basic custom agent"
3+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
FROM python:3.10-slim-bookworm AS agent-slim
2+
3+
ARG VERSION
4+
5+
# Install required dependencies
6+
RUN apt-get update && apt-get install -y \
7+
build-essential \
8+
git \
9+
&& apt-get clean \
10+
&& rm -rf /var/lib/apt/lists/*
11+
12+
# Install Python dependencies
13+
RUN pip install --no-cache-dir \
14+
prometheus-client \
15+
grpcio-health-checking==1.67.1
16+
17+
# Install Flytekit from GitHub
18+
RUN pip install --no-cache-dir git+https://github.com/flyteorg/flytekit.git@master
19+
20+
# Copy and install the bigquery plugin
21+
COPY flytekit-bigquery /flytekit-bigquery
22+
RUN pip install --no-cache-dir /flytekit-bigquery
23+
24+
COPY flytekit-openai /flytekit-openai
25+
RUN pip install --no-cache-dir /flytekit-openai
26+
27+
# Cleanup
28+
RUN apt-get purge -y build-essential git \
29+
&& apt-get autoremove -y \
30+
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
31+
32+
# Set the default command
33+
CMD ["pyflyte", "serve", "agent", "--port", "8000"]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# flyte-custom-agent-template
2+
How to write your custom agent and build it with a Dockerfile.
3+
4+
## Concepts
5+
1. flytekit will load plugin [here](https://github.com/flyteorg/flytekit/blob/ff2d0da686c82266db4dbf764a009896cf062349/flytekit/__init__.py#L322-L323),
6+
so you must add your plugin to `entry_points` in [setup.py](https://github.com/Future-Outlier/flyte-custom-agent-template/blob/main/flytekit-bigquery/setup.py#L39).
7+
2. Agent registration is triggered by loading the plugin. For example,
8+
BigQuery's agent registration is triggered [here](https://github.com/Future-Outlier/flyte-custom-agent/blob/main/flytekit-bigquery/flytekitplugins/bigquery/agent.py#L97)
9+
10+
## Build your custom agent
11+
1. Following the folder structure in this repo, you can build your custom agent.
12+
2. Build your own custom agent ([learn more](https://docs.flyte.org/en/latest/user_guide/flyte_agents/developing_agents.html))
13+
14+
> In the following command, `localhost:30000` is the Docker registry that ships with the Flyte demo cluster. Use it or replace it with a registry where you have push permissions.
15+
16+
```bash
17+
docker buildx build --platform linux/amd64 -t localhost:30000/flyteagent:custom-agent -f Dockerfile .
18+
```
19+
20+
3. Test the image:
21+
```bash
22+
docker run -it localhost:30000/flyteagent:custom-agent
23+
```
24+
25+
4. Check the logs (sensor is created by flytekit, bigquery and openai is created by the custom agent)
26+
```
27+
(dev) future@outlier ~ % docker run -it localhost:30000/flyteagent:custom-agent
28+
29+
WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
30+
🚀 Starting the agent service...
31+
Starting up the server to expose the prometheus metrics...
32+
Agent Metadata
33+
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
34+
┃ Agent Name ┃ Support Task Types ┃ Is Sync ┃
35+
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
36+
│ Sensor │ sensor (v0) │ False │
37+
│ ChatGPT Agent │ chatgpt (v0) │ True │
38+
│ Bigquery Agent │ bigquery_query_job_task (v0) │ False │
39+
└────────────────┴───────────────────────────────┴─────────┘
40+
```
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# Flytekit BigQuery Plugin
2+
3+
BigQuery enables us to build data-intensive applications without operational burden. Flyte backend can be connected with the BigQuery service. Once enabled, it can allow you to query a BigQuery table.
4+
5+
To install the plugin, run the following command:
6+
7+
```bash
8+
pip install flytekitplugins-bigquery
9+
```
10+
11+
To configure BigQuery in the Flyte deployment's backend, follow the [configuration guide](https://docs.flyte.org/en/latest/deployment/plugin_setup/gcp/bigquery.html#deployment-plugin-setup-gcp-bigquery).
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""
2+
.. currentmodule:: flytekitplugins.bigquery
3+
4+
This package contains things that are useful when extending Flytekit.
5+
6+
.. autosummary::
7+
:template: custom.rst
8+
:toctree: generated/
9+
10+
BigQueryConfig
11+
BigQueryTask
12+
BigQueryAgent
13+
"""
14+
15+
from .agent import BigQueryAgent
16+
from .task import BigQueryConfig, BigQueryTask
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import datetime
2+
from dataclasses import dataclass
3+
from typing import Dict, Optional
4+
5+
from flyteidl.core.execution_pb2 import TaskExecution, TaskLog
6+
from google.cloud import bigquery
7+
8+
from flytekit import FlyteContextManager, StructuredDataset, logger
9+
from flytekit.core.type_engine import TypeEngine
10+
from flytekit.extend.backend.base_agent import AgentRegistry, AsyncAgentBase, Resource, ResourceMeta
11+
from flytekit.extend.backend.utils import convert_to_flyte_phase
12+
from flytekit.models.literals import LiteralMap
13+
from flytekit.models.task import TaskTemplate
14+
15+
pythonTypeToBigQueryType: Dict[type, str] = {
16+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#data_type_sizes
17+
list: "ARRAY",
18+
bool: "BOOL",
19+
bytes: "BYTES",
20+
datetime.datetime: "DATETIME",
21+
float: "FLOAT64",
22+
int: "INT64",
23+
str: "STRING",
24+
}
25+
26+
27+
@dataclass
28+
class BigQueryMetadata(ResourceMeta):
29+
job_id: str
30+
project: str
31+
location: str
32+
33+
34+
class BigQueryAgent(AsyncAgentBase):
35+
name = "Bigquery Agent"
36+
37+
def __init__(self):
38+
super().__init__(task_type_name="bigquery_query_job_task", metadata_type=BigQueryMetadata)
39+
40+
def create(
41+
self,
42+
task_template: TaskTemplate,
43+
inputs: Optional[LiteralMap] = None,
44+
**kwargs,
45+
) -> BigQueryMetadata:
46+
job_config = None
47+
if inputs:
48+
ctx = FlyteContextManager.current_context()
49+
python_interface_inputs = {
50+
name: TypeEngine.guess_python_type(lt.type) for name, lt in task_template.interface.inputs.items()
51+
}
52+
native_inputs = TypeEngine.literal_map_to_kwargs(ctx, inputs, python_interface_inputs)
53+
logger.info(f"Create BigQuery job config with inputs: {native_inputs}")
54+
job_config = bigquery.QueryJobConfig(
55+
query_parameters=[
56+
bigquery.ScalarQueryParameter(name, pythonTypeToBigQueryType[python_interface_inputs[name]], val)
57+
for name, val in native_inputs.items()
58+
]
59+
)
60+
61+
custom = task_template.custom
62+
project = custom["ProjectID"]
63+
location = custom["Location"]
64+
client = bigquery.Client(project=project, location=location)
65+
query_job = client.query(task_template.sql.statement, job_config=job_config)
66+
67+
return BigQueryMetadata(job_id=str(query_job.job_id), location=location, project=project)
68+
69+
def get(self, resource_meta: BigQueryMetadata, **kwargs) -> Resource:
70+
client = bigquery.Client()
71+
log_link = TaskLog(
72+
uri=f"https://console.cloud.google.com/bigquery?project={resource_meta.project}&j=bq:{resource_meta.location}:{resource_meta.job_id}&page=queryresults",
73+
name="BigQuery Console",
74+
)
75+
76+
job = client.get_job(resource_meta.job_id, resource_meta.project, resource_meta.location)
77+
if job.errors:
78+
logger.error("failed to run BigQuery job with error:", job.errors.__str__())
79+
return Resource(phase=TaskExecution.FAILED, message=job.errors.__str__(), log_links=[log_link])
80+
81+
cur_phase = convert_to_flyte_phase(str(job.state))
82+
res = None
83+
84+
if cur_phase == TaskExecution.SUCCEEDED:
85+
dst = job.destination
86+
if dst:
87+
output_location = f"bq://{dst.project}:{dst.dataset_id}.{dst.table_id}"
88+
res = {"results": StructuredDataset(uri=output_location)}
89+
90+
return Resource(phase=cur_phase, message=str(job.state), log_links=[log_link], outputs=res)
91+
92+
def delete(self, resource_meta: BigQueryMetadata, **kwargs):
93+
client = bigquery.Client()
94+
client.cancel_job(resource_meta.job_id, resource_meta.project, resource_meta.location)
95+
96+
97+
AgentRegistry.register(BigQueryAgent())
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from dataclasses import dataclass
2+
from typing import Any, Dict, Optional, Type
3+
4+
from google.protobuf import json_format
5+
from google.protobuf.struct_pb2 import Struct
6+
7+
from flytekit import lazy_module
8+
from flytekit.configuration import SerializationSettings
9+
from flytekit.extend import SQLTask
10+
from flytekit.extend.backend.base_agent import AsyncAgentExecutorMixin
11+
from flytekit.models import task as _task_model
12+
from flytekit.types.structured import StructuredDataset
13+
14+
bigquery = lazy_module("google.cloud.bigquery")
15+
16+
17+
@dataclass
18+
class BigQueryConfig(object):
19+
"""
20+
BigQueryConfig should be used to configure a BigQuery Task.
21+
"""
22+
23+
ProjectID: str
24+
Location: Optional[str] = None
25+
QueryJobConfig: Optional[bigquery.QueryJobConfig] = None
26+
27+
28+
class BigQueryTask(AsyncAgentExecutorMixin, SQLTask[BigQueryConfig]):
29+
"""
30+
This is the simplest form of a BigQuery Task, that can be used even for tasks that do not produce any output.
31+
"""
32+
33+
# This task is executed using the BigQuery handler in the backend.
34+
# https://github.com/flyteorg/flyteplugins/blob/43623826fb189fa64dc4cb53e7025b517d911f22/go/tasks/plugins/webapi/bigquery/plugin.go#L34
35+
_TASK_TYPE = "bigquery_query_job_task"
36+
37+
def __init__(
38+
self,
39+
name: str,
40+
query_template: str,
41+
task_config: BigQueryConfig,
42+
inputs: Optional[Dict[str, Type]] = None,
43+
output_structured_dataset_type: Optional[Type[StructuredDataset]] = None,
44+
**kwargs,
45+
):
46+
"""
47+
To be used to query BigQuery Tables.
48+
49+
:param name: Name of this task, should be unique in the project
50+
:param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating. Refer to the templating documentation
51+
:param task_config: BigQueryConfig object
52+
:param inputs: Name and type of inputs specified as an ordered dictionary
53+
:param output_structured_dataset_type: If some data is produced by this query, then you can specify the output StructuredDataset type
54+
:param kwargs: All other args required by Parent type - SQLTask
55+
"""
56+
outputs = None
57+
if output_structured_dataset_type is not None:
58+
outputs = {
59+
"results": output_structured_dataset_type,
60+
}
61+
super().__init__(
62+
name=name,
63+
task_config=task_config,
64+
query_template=query_template,
65+
inputs=inputs,
66+
outputs=outputs,
67+
task_type=self._TASK_TYPE,
68+
**kwargs,
69+
)
70+
self._output_structured_dataset_type = output_structured_dataset_type
71+
72+
def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
73+
config = {
74+
"Location": self.task_config.Location,
75+
"ProjectID": self.task_config.ProjectID,
76+
}
77+
if self.task_config.QueryJobConfig is not None:
78+
config.update(self.task_config.QueryJobConfig.to_api_repr()["query"])
79+
s = Struct()
80+
s.update(config)
81+
return json_format.MessageToDict(s)
82+
83+
def get_sql(self, settings: SerializationSettings) -> Optional[_task_model.Sql]:
84+
sql = _task_model.Sql(statement=self.query_template, dialect=_task_model.Sql.Dialect.ANSI)
85+
return sql
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from setuptools import setup
2+
3+
PLUGIN_NAME = "bigquery"
4+
5+
microlib_name = f"flytekitplugins-{PLUGIN_NAME}"
6+
7+
plugin_requires = [
8+
"flytekit>1.10.7",
9+
"google-cloud-bigquery>=3.21.0",
10+
"google-cloud-bigquery-storage>=2.25.0",
11+
"flyteidl>1.10.7",
12+
]
13+
14+
__version__ = "0.0.0+develop"
15+
16+
setup(
17+
name=microlib_name,
18+
version=__version__,
19+
author="flyteorg",
20+
author_email="[email protected]",
21+
description="This package holds the Bigquery plugins for flytekit",
22+
namespace_packages=["flytekitplugins"],
23+
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
24+
install_requires=plugin_requires,
25+
license="apache2",
26+
python_requires=">=3.9",
27+
classifiers=[
28+
"Intended Audience :: Science/Research",
29+
"Intended Audience :: Developers",
30+
"License :: OSI Approved :: Apache Software License",
31+
"Programming Language :: Python :: 3.9",
32+
"Programming Language :: Python :: 3.10",
33+
"Topic :: Scientific/Engineering",
34+
"Topic :: Scientific/Engineering :: Artificial Intelligence",
35+
"Topic :: Software Development",
36+
"Topic :: Software Development :: Libraries",
37+
"Topic :: Software Development :: Libraries :: Python Modules",
38+
],
39+
entry_points={"flytekit.plugins": [f"{PLUGIN_NAME}=flytekitplugins.{PLUGIN_NAME}"]},
40+
)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# OpenAI Plugins
2+
3+
The plugin currently features ChatGPT and Batch API agents.
4+
5+
To install the plugin, run the following command:
6+
7+
```bash
8+
pip install flytekitplugins-openai
9+
```
10+
11+
## ChatGPT
12+
13+
The ChatGPT plugin allows you to run ChatGPT tasks within the Flyte workflow without requiring any code changes.
14+
15+
```python
16+
from flytekit import task, workflow
17+
from flytekitplugins.openai import ChatGPTTask, ChatGPTConfig
18+
19+
chatgpt_small_job = ChatGPTTask(
20+
name="chatgpt gpt-3.5-turbo",
21+
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
22+
chatgpt_config={
23+
"model": "gpt-3.5-turbo",
24+
"temperature": 0.7,
25+
},
26+
)
27+
28+
chatgpt_big_job = ChatGPTTask(
29+
name="chatgpt gpt-4",
30+
openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7",
31+
chatgpt_config={
32+
"model": "gpt-4",
33+
"temperature": 0.7,
34+
},
35+
)
36+
37+
38+
@workflow
39+
def wf(message: str) -> str:
40+
message = chatgpt_small_job(message=message)
41+
message = chatgpt_big_job(message=message)
42+
return message
43+
44+
45+
if __name__ == "__main__":
46+
print(wf(message="hi"))
47+
```
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
"""
2+
.. currentmodule:: flytekitplugins.openai
3+
4+
.. autosummary::
5+
:template: custom.rst
6+
:toctree: generated/
7+
8+
DownloadJSONFilesTask
9+
UploadJSONLFileTask
10+
OpenAIFileConfig
11+
ChatGPTAgent
12+
ChatGPTTask
13+
"""
14+
15+
from .chatgpt.agent import ChatGPTAgent
16+
from .chatgpt.task import ChatGPTTask

basic-custom-agent/{{cookiecutter.project_name}}/flytekit-openai/flytekitplugins/openai/chatgpt/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)