Towards building a Data steward Agent for your Data Governance tool
Introduction
Year 2025 seems to be the year of AI Agents. Every enterprise out there are looking for ways to increase their productivity with generative ai and AI Agents. In 2024 many generative AI use cases were examined by companies and by now generative AI is no longer an enigma. Enterprises have been using it and evaluating it. The use of generative AI has spread amongst the population of non-technical folks quite fast; and now AI Agents is an emerging technology that is going to make an even bigger impact.
There has been various frameworks such as Langchain, Crew ai, Bee agent framework and many more to come that focus on Agents. There has been a huge focus on creating these frameworks and ensure they abide by definition of an agent framework.
In this article I am going to focus less on the agent frameworks and more on what needs to be done before getting started with these frameworks. We can all agree that Agents are here to improve the work of knowledge workers therefore before getting down and dirty with any of these frameworks we need to understand who is our knowledge worker and what do they do.
For this article I am going to explore the role of a Data Steward and will attempt to create a data steward agent for a data governance tool to help a data steward be more productive. Even though data steward interacts with other roles within the data governance team, I am going to limit the scope of this article to the role itself. I will explore the multi agent interaction within a data governance team for another Article. Let’s start with the data steward and lets see how we can create an AI agent for this role.
Data Steward
According to Chat-gpt here is the definition of a data steward
As you can see data steward is a big role and requires the person assuming this role to have knowledge of data, data management, data quality as well as understanding the organization’s data governance requirement and much more. This role needs to navigate various aspects of data so that the teams consuming the data can use the right data at the right time for the right reasons with the right privileges.
This person uses various applications and tools in addition communicates with various people within organization to accomplish his/her tasks.
Let’s first identify the tools that a data steward uses; Data stewards use a variety of tools to manage, monitor, and ensure the quality, security, and governance of an organization’s data assets; Each enterprise can have a different definition for their data stewards and can distribute expectations to other interacting roles; however here is a set of tools that a data steward may use:
Now let’s identify the people a data steward interacts with; as you can see below data steward has quite a central role in data governance. Again this list may be different from enterprise to enterprise.
For Now let’s focus on the data governance tools and we will look at the data steward alone without getting into its interaction with other roles. We want to see how we can create a data steward agent that would complete tasks related to data governance. Let’s explore one of the tasks of a data steward; For this article I use IBM Knowledge Catalog as the governance tool.
Manage governance artifacts
In this article I focus on one of the responsibilities of a a data steward which is Managing governance artifacts. Governance artifacts are set of objects within IKC to govern data. As shown in below figure each category can:
- Contain other categories
- Contain business terms, classification, Data classes, Reference data sets and policies
Each policy can :
- Contain other policies
- Contain governance rules, data protection rules and SLA DQ rules
Basically Categories are folder like objects to organize all these governance artifacts which are used to govern data assets.
Now that we are familiar the object model let’s explore what does managing governance artifacts mean; As per documentation managing governance artifacts means to create, edit, find, view, delete, import and export governance artifacts mentioned above.
In this article we are going to create an agent who can manage governance artifacts. We want to be able to ask the agent questions like :
1-Create three categories called IT, support and development
2- Add two business terms “device” and “ticket” under “support” subcategory
3- View all the data classes under development subcategory
and the agent should be able to perform these actions. Now the questions above may seem very simple but require various interactions with the IKC or IBM Knowledge Catalog. Let’s explore the first one:
Create three categories called IT support and development
To perform this action we need to:
1- Create a category called IT
2- Create category called support
3- Create a category called development
Agent Design
we are going to create three agents:
- Category manager agent
- Business term manager agent
- Catalog search agent
This agents are just for the scope of this article. We can add other agents for other governance artifacts as well such as policy manager agent or classification manager agent.
For each of the agents above we need to provide a role, a back story a role, a goal and a back story
# Agents
category_manager = Agent(
role="Category Manager",
goal="Manage categories, including creation, deletion, and organization.",
backstory="An expert in handling category hierarchies and ensuring proper organization.",
verbose=True,
tools=[create_category, delete_category],
)
business_term_manager = Agent(
role="Business Term Manager",
goal="Manage business terms, including creation, deletion, and retrieval.",
backstory="An experienced manager of business terms, focusing on governance and compliance.",
verbose=True,
tools=[],
)
lucene_search_agent = Agent(
role="Search Manager",
goal="Execute complex queries to search for governance artifacts.",
backstory="A powerful agent capable of searching across large datasets with sophisticated query capabilities.",
verbose=True,
tools=[execute_lucene_query],
)
When the user submits their request with respect to what they want to accomplish, the application should be able to relay the request to the right agent; In a lot of tutorials we can see the tasks are hard coded; In this article we use a technique to dynamically create tasks and pass the to the right agents executing the right tool based on the users entry. If the request is to create categories then a task will be created for the category manager and is delegated to it. If we are searching for a business term, the search agent will be taking care of that task.
To Accomplish this we use a LLM to create a structured json output which then we can use to distinguish which agent will be responsible for the request.
We basically create a prompt for the LLM to infer actions that need to be done given the text from the user:
def infer_actions_with_llm(user_input: str) -> dict:
"""
Interpret the user input using LLM and return inferred actions.
"""
prompt = f"""
Interpret the following input and infer the actions required to manage governance artifacts (categories, business terms, etc.).
Output a JSON structure describing the artifact type and required actions.
Input: "{user_input}"
Example Output:
{{
"artifact_type": "business_term",
"actions": [
{{
"type": "create",
"name": "Confidentiality",
"description": "This term defines data privacy and confidentiality."
}},
{{
"type": "delete",
"id": "12345"
}}
]
}}
or
{{
"artifact_type":"category",
"actions":[
{{
{{"type": "search",
"query": ""query": {{"match": {{"Finance" }}}}"
}}
}}
]
}}
"""
try:
response = client.chat.completions.create(model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a helpful assistant for structuring user inputs."},
{"role": "user", "content": prompt}
],
max_tokens=1500,
temperature=0.7)
# Parse and return the structured response
return eval(response.choices[0].message.content.strip())
except Exception as e:
raise ValueError(f"Error in LLM inference: {e}")
Then we generate tasks for given actions and their corresponding artifacts:
def generate_tasks_from_actions(actions: list, artifact_type: str) -> list:
"""
Create tasks dynamically based on inferred actions.
"""
tasks = []
for action in actions:
if artifact_type == "category":
# Map category actions to tools
if action["type"] == "create":
task = Task(
description=f"Create category '{action['name']}'.",
expected_output="Confirmation of category creation by returning the JSON from the request.",
tools=[create_category],
agent=category_manager,
)
elif action["type"] == "delete":
task = Task(
description=f"Delete category with ID '{action['id']}'.",
expected_output="Confirmation of category deletion.",
tools=[delete_category],
agent=category_manager,
)
elif action["type"] == "search":
task = Task(
description=f"Search for categories with query '{action['query']}'.",
expected_output="Search results containing matching categories.",
tools=[execute_lucene_query],
agent=category_manager,
)
tasks.append(task)
elif artifact_type == "business_term":
# Map business term actions to tools
if action["type"] == "create":
task = Task(
description=f"Create business term '{action['name']}' with description '{action['description']}'.",
expected_output="Confirmation of business term creation.",
tools=[create_business_term],
agent=business_term_manager,
)
elif action["type"] == "delete":
task = Task(
description=f"Delete business term with ID '{action['id']}'.",
expected_output="Confirmation of business term deletion.",
tools=[delete_business_term],
agent=business_term_manager,
)
elif action["type"] == "search":
task = Task(
description=f"Search for business terms with query '{action['query']}'.",
expected_output="Search results containing matching business terms.",
tools=[execute_lucene_query],
agent=business_term_manager,
)
tasks.append(task)
elif artifact_type == "search":
# General search action (for all artifact types)
task = Task(
description=f"Perform a search with query '{action['query']}'.",
expected_output="Search results containing matching artifacts.",
tools=[execute_lucene_query],
agent=business_term_manager if "business_term" in action.get("artifact_type", "") else category_manager,
)
tasks.append(task)
return tasks
and the tools are basically api calls to our governance platform:
@tool
def execute_lucene_query(query: str, is_simple: bool = True, limit: int = 100, role: str = "viewer") -> str:
"""
Execute a Lucene syntax query to search for assets and artifacts.
Args:
query (str): The search query in Lucene syntax.
is_simple (bool): Whether to use simple query syntax. Default is True.
limit (int): The maximum number of results to return. Default is 100.
role (str): Role access for governance artifacts. Default is "viewer".
Returns:
str: JSON response containing search results or an error message.
"""
headers = {
"Authorization": f"Bearer {GOVERNANCE_API_KEY}",
"Run-as-Tenant": "999",
"Content-Type": "application/json",
}
params = {
"query": query,
"isSimple": is_simple,
"limit": limit,
"role": role,
"auth_scope": "all"
}
response = requests.get(f"{GOVERNANCE_API_URL}/v3/search", headers=headers, params=params)
if response.status_code == 200:
return f"Search results:\n{response.json()}"
else:
return f"Error: {response.status_code} - {response.text}"
@tool
def create_category(category_name: str) -> str:
"""
Create a category in the governance system.
Args:
api_url (str): The base URL for the governance API.
api_key (str): The API key for authentication.
category_name (str): The name of the category to create.
Returns:
str: Success message or error message.
"""
headers = {"Authorization": f"Bearer {GOVERNANCE_API_KEY}", "Content-Type": "application/json"}
payload = {"name": category_name}
response = requests.post(f"{GOVERNANCE_API_URL}/v3/categories", json=payload, headers=headers)
return f"Category '{category_name}' created successfully." if response.status_code == 201 else f"Error: {response.text}"
@tool
def delete_category(category_id: str) -> str:
"""
Delete a category from the governance system.
Args:
api_url (str): The base URL for the governance API.
api_key (str): The API key for authentication.
category_id (str): The ID of the category to delete.
Returns:
str: Success message or error message.
"""
headers = {"Authorization": f"Bearer {GOVERNANCE_API_KEY}"}
response = requests.delete(f"{GOVERNANCE_API_URL}/v3/categories/{category_id}", headers=headers)
return "Category deleted successfully." if response.status_code == 204 else f"Error: {response.text}"
and here in the main function we orchestrate all as:
# Main Function
def main():
user_inputs = [
"Create 3 categories : finance, IT and HR"
]
for user_input in user_inputs:
print(f"Processing request: {user_input}")
# Step 1: Interpret input
inferred_data = infer_actions_with_llm(user_input)
artifact_type = inferred_data.get("artifact_type")
actions = inferred_data.get("actions", [])
print(actions)
# Step 2: Generate tasks
tasks = generate_tasks_from_actions(actions, artifact_type)
# Step 3: Create and kick off the crew
crew = Crew(
agents=[category_manager],
tasks=tasks,
process=Process.sequential,
)
result = crew.kickoff(inputs={})
print(f"Execution Result:\n{result}")
if __name__ == "__main__":
main()
Results
Now let’s look at how the system behaves when we ask it to :
1- Create 3 categories, finance, IT and HR
And the results will be propagated to the IBM Knowledge catalog User Interface
We can even ask information about our governance artifacts :
2-View information about a finance category
Resources:
Code : https://github.com/ijgitsh/DataStewardAgent_IKC
Crew AI : https://docs.crewai.com/introduction
IBM Knowledge Catalog : https://www.ibm.com/products/knowledge-catalog
Watson Data API : https://cloud.ibm.com/apidocs/watson-data-api-cpd