When starting the workflow from the terminal, it is straightforward to see which step it is executing and the logging we put in those steps.
We can also enable the human-in-the-loop interaction by simply using user_feedback = input()
in the workflow. This will pause the workflow and wait for the user input (See the human-in-the-loop example in this official Llamaindex notebook). However, to be able to achieve the same functionality in a user-friendly interface, we need additional modifications to the original workflow.
Workflow can take a long time to execute, so for a better user experience, Llamaindex provided a way to send streaming events to indicate the progress of the workflow, as shown in the notebook here. In my workflow, I define a WorkflowStreamingEvent
class to include useful information about the event message, such as the type of the event, and from which step it is sent:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Field(
..., description="Type of the event"
)
event_sender: str = Field(
..., description="Sender (workflow step name) of the event"
)
event_content: Dict[str, Any] = Field(..., description="Content of the event")
To enable sending streaming events, the workflow step needs to have access to the shared context, which is done by adding @step(pass_context=True)
decorator to the step definition. Then in the step definition, we can send event messages about the progress through the context. For example, in the tavily_query()
step:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.data["research_topic"] = ev.user_query
query = f"arxiv papers about the state of the art of {ev.user_query}"
ctx.write_event_to_stream(
Event(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=inspect.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{query}'"},
).model_dump()
)
)
In this example, we set the event_type
to be “server_message”
. It means that it is an update message and no user action is required. We have another type of event "request_user_input"
that indicates a user input is needed. For example, in the gather_feedback_outline()
step in the workflow, after generating the slide text outlines from the original paper summary, a message is sent to prompt the user to provide approval and feedback on the outline text:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Present user the original paper summary and the outlines generated, gather feedback from user"""
...# Send a special event indicating that user input is needed
ctx.write_event_to_stream(
Event(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": inspect.currentframe().f_code.co_name,
"event_content": {
"summary": ev.summary,
"outline": ev.outline.dict(),
"message": "Do you approve this outline? If not, please provide feedback.",
},
}
)
)
)
...
These events are handled differently in the backend API and the frontend logic, which I will describe in detail in the later sections of this article.
When sending a "request_user_input"
event to the user, we only want to proceed to the next step after we have received the user input. As shown in the workflow diagram above, it either proceeds to the outlines_with_layout()
step if the user approves the outline, or to the summary2outline()
step again if the user does not approve.
This is achieved using the Future()
object from Python’s asyncio
library. In the SlideGenerationWorkflow
class, we set an attribute self.user_input_future = asyncio.Future()
that can be waited on in the gather_feedback_outline()
step. The subsequent execution of the workflow is conditioned on the content of the user feedback:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...# Wait for user input
if not self.user_input_future.done():
user_response = await self.user_input_future
logger.info(f"gather_feedback_outline: Got user response: {user_response}")
# Process user_response, which should be a JSON string
try:
response_data = json.loads(user_response)
approval = response_data.get("approval", "").lower().strip()
feedback = response_data.get("feedback", "").strip()
except json.JSONDecodeError:
# Handle invalid JSON
logger.error("Invalid user response format")
raise Exception("Invalid user response format")
if approval == ":material/thumb_up:":
return OutlineOkEvent(summary=ev.summary, outline=ev.outline)
else:
return OutlineFeedbackEvent(
summary=ev.summary, outline=ev.outline, feedback=feedback
)
We set up the backend using fastAPI, expose a POST endpoint to handle requests, and initiate the workflow run. The asynchronous function run_workflow_endpoint()
takes ResearchTopic
as input. In the function, an asynchronous generator event_generator()
is defined, which creates a task to run the workflow and streams the events to the client as the workflow progresses. When the workflow finishes, it will also stream the final file results to the client.
class ResearchTopic(BaseModel):
query: str = Field(..., example="example query")@app.post("/run-slide-gen")
async def run_workflow_endpoint(topic: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}\n\n"
task = asyncio.create_task(wf.run(user_query=topic.query))
logger.debug(f"event_generator: Created task {task}")
try:
async for ev in wf.stream_events():
logger.info(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}\n\n"
await asyncio.sleep(0.1) # Small sleep to ensure proper chunking
final_result = await task
# Construct the download URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"result": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}\n\n"
except Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'event': 'error', 'message': error_message})}\n\n"
finally:
# Clean up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="text/event-stream")
In addition to this endpoint, there are endpoints for receiving user input from the client and handling file download requests. Since each workflow is assigned a unique workflow ID, we can map the user input received from the client to the correct workflow. By call the set_result()
on the awaiting Future
, the pending workflow can resume execution.
@app.post("/submit_user_input")
async def submit_user_input(data: dict = Body(...)):
workflow_id = data.get("workflow_id")
user_input = data.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the future
logger.info(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.done():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.info("submit_user_input: set_result called")
else:
logger.info("submit_user_input: future already done")
return {"status": "input received"}
else:
raise HTTPException(
status_code=404, detail="Workflow not found or future not initialized"
)
The download endpoint also identifies where the final file is located based on the workflow ID.
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "final.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="application/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"final.pptx",
)
else:
raise HTTPException(status_code=404, detail="File not found")
In the frontend page, after the user submits the research topic through st.text_input()
, a long-running process is started in a background thread in a new event loop for receiving the streamed events from the backend, without interfering with the rest of the page:
def start_long_running_task(url, payload, message_queue, user_input_event):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.close()
except Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))...
def main():
...
with st.sidebar:
with st.form(key="slide_gen_form"):
query = st.text_input(
"Enter the topic of your research:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a new workflow
st.session_state.workflow_complete = False
# Start the long-running task in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Starting the background thread...")
st.session_state.workflow_thread = threading.Thread(
target=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"query": query},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.start()
st.session_state.received_lines = []
else:
st.write("Background thread is already running.")
The event data streamed from the backend is fetched by httpx.AsyncClient
and put into a message queue for further processing. Different information is extracted depending on the event types. For event type “request_user_input”
, the thread is also paused until the user input is provided.
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as client:
async with client.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield lineasync def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Starting to fetch streaming data..."))
data_json = None
async for data in fetch_streaming_data(url, payload):
if data:
try:
data_json = json.loads(data)
if "workflow_id" in data_json:
# Send workflow_id to main thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
continue
elif "final_result" in data_json:
# Send final_result to main thread
message_queue.put(("final_result", data_json["final_result"]))
continue
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Send the message to the main thread
message_queue.put(("user_input_required", data_json))
# Wait until user input is provided
user_input_event.wait()
user_input_event.clear()
continue
else:
# Send the line to the main thread
message_queue.put(("message", format_workflow_info(data_json)))
except json.JSONDecodeError: # todo: is this necessary?
message_queue.put(("message", data))
if data_json and "final_result" in data_json or "final_result" in str(data):
break # Stop processing after receiving the final result
We store the messages in the st.session_state
and use a st.expander()
to display and update these streamed data.
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or update the expander with the latest truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
To ensure the UI remains responsive and displays the event messages when they are being processed in a background thread, we use a customed autorefresh component to refresh the page at a set interval:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, limit=None, key="data_refresh")
When the streamed event is of type “request_user_input”
, we will display related information in a separate container and gather user feedback. As there can be multiple events that require user input from one workflow run, we put them in a message queue and make sure to assign a unique key to the st.feedback()
, st.text_area()
and st.button()
that are linked to each event to ensure the widgets don’t interfere with each other:
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
data = st.session_state.user_input_prompt
event_type = data.get("event_type")
if event_type == "request_user_input":
summary = data.get("event_content").get("summary")
outline = data.get("event_content").get("outline")
prompt_message = data.get("event_content").get(
"message", "Please review the outline."
)# display the content for user input
st.markdown("## Original Summary:")
st.text_area("Summary", summary, disabled=True, height=400)
st.divider()
st.markdown("## Generated Slide Outline:")
st.json(outline)
st.write(prompt_message)
# Define unique keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Display the approval feedback widget
approval = st.feedback("thumbs", key=approval_key)
st.write(f"Current Approval state is: {approval}")
logging.info(f"Current Approval state is: {approval}")
# Display the feedback text area
feedback = st.text_area(
"Please provide feedback if you have any:", key=feedback_key
)
# Handle the submission of user response
if st.button(
"Submit Feedback", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and feedback using unique keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Ensure approval_state is valid
if approval_state not in [0, 1]:
st.error("Please select an approval option.")
return
user_response = {
"approval": (
":material/thumb_down:"
if approval_state == 0
else ":material/thumb_up:"
),
"feedback": user_feedback,
}
# Send the user's response to the backend
try:
response = requests.post(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.info(
f"Backend response for submitting approval: {response.status_code}"
)
except requests.RequestException as e:
st.error(f"Failed to submit user input: {str(e)}")
return
...
In the end, when the workflow run finally finishes, the frontend client will get a response that contains the path to the final generated files (same slide deck in pdf format for rendering in the UI and pptx format for downloading as the final result). We display the pdf file and create a button for downloading the pptx file:
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
try:
# Fetch the PDF content
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.contentst.markdown("### Generated Slide Deck:")
# Display the PDF using an iframe
st.markdown(
f'<iframe src="data:application/pdf;base64,{base64.b64encode(st.session_state.pdf_data).decode()}" width="100%" height="600px" type="application/pdf"></iframe>',
unsafe_allow_html=True,
)
except Exception as e:
st.error(f"Failed to load the PDF file: {str(e)}")
# Provide the download button for PPTX if available
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
try:
# Fetch the PPTX content
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content
st.download_button(
label="Download Generated PPTX",
data=pptx_data,
file_name="generated_slides.pptx",
mime="application/vnd.openxmlformats-officedocument.presentationml.presentation",
)
except Exception as e:
st.error(f"Failed to load the PPTX file: {str(e)}")
We will create a multi-service Docker application with docker-compose
to run the frontend and backend apps.
version: '3.8'services:
backend:
build:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./data:/app/data
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
build:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
That’s it! Just run docker-compose up
, and we now have an app that can run a research workflow based on the user’s input query, prompt the user for feedback during the execution, and display the final result to the user.