Streaming with REST API for LangChain Applications

Arun Chalise
3 min readApr 23, 2024

--

Implementing a streaming endpoint for LangChain application

Responses generated from LLM applications can take a while to complete depending on the length of the content, and it will make a very poor user experience if the response is displayed only after it has been completed. This is where streaming comes into play as it allows the content to be displayed to the user as it is being generated making the user experience more engaging and responsive.

With LangChain4j, to implement a streaming API the method only needs to return TokenStream instead of String in the agent interface.

public interface CustomerServiceAgent {
@SystemMessage({
"You are a customer support officer of ABC insurance, Sydney Australia.",
"Before providing information about about a claim application, you MUST always check:",
"correlationId or email. If the customer is in APPROVED state, please tell the customer that their claim will be settled in two weeks",
"When retrieving all customers, display the records in a tabular format",
"Today is {{current_date}}."
})
/**
* Non-streaming
*/
String chat(String userMessage);

/**
* Streaming
*/
TokenStream chatStream(String userMessage);

}

The customer service agent can now serve both streaming and non-streaming models as below:

    @Bean
CustomerServiceAgent customerSupportAgent(StreamingChatLanguageModel streamingChatLanguageModel,
ChatLanguageModel chatLanguageModel,
ContentRetriever contentRetriever,
CustomerService customerService) {
return AiServices.builder(CustomerServiceAgent.class)
.streamingChatLanguageModel(streamingChatLanguageModel)
.chatLanguageModel(chatLanguageModel)
.chatMemory(MessageWindowChatMemory.withMaxMessages(20))
.tools(customerService)
.contentRetriever(contentRetriever)
.build();
}

@Bean
StreamingChatLanguageModel streamingModel() {
return OpenAiStreamingChatModel.withApiKey(System.getenv("AI_OPENAI_API_KEY"));
}

Next simply add aSpring WebFlux wrapper rest endpoint to serve the streaming response as server sent events.

@RestController()
@CrossOrigin(origins = "*")
public class ChatController {
private CustomerServiceAgent agent;

public ChatController(CustomerServiceAgent agent) {
this.agent = agent;
}

@GetMapping("/chat")
public String chat(@RequestParam("question") String question) {
System.out.println("Submitted query ");
System.out.println(question);
return agent.chat(question);
}

@GetMapping(value = "/chatstream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<String> chatStream(@RequestParam("question") String question) {
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

TokenStream ts = agent.chatStream(question);
ts.onNext(sink::tryEmitNext)
.onError(sink::tryEmitError)
.start();

return sink.asFlux();
}
}

We have used a very useful construct Sinks from Project Reactor in the example above to push streaming data as Mono or Flux . Every time a token is received from the CustomerServiceAgent , it is emitted via Sinks as a stream of tokens.

A simple ReactJS Chat Interface

A simple ReactJS application can be quickly spun up using nextjsby following their instructions. For simplicity, we have implemented the chat ui in the default page page.tsx and the streaming version is implemented in stream/page.tsx . To integrate with the locally running langchain agent above, we can simply call the api and update the page:

  const post = async () => {
setResponse((r) => {
return "Please wait retrieving response ..."
})
const response = await fetch(`http://localhost:8080/chat?question=${message}`);
const content = await response.text();
setResponse(content);
}

For the streaming endpoint, we can make use of an EventSource instance to retain persistent connection to the streaming endpoint and update the content of the page as the response tokens are received.

const post = async () => {
setResponse((r) => {
return "Please wait retrieving response ..."
});
let eventsrc = new EventSource(`http://localhost:8080/chatstream?question=${message}`);
setResponse('');
eventsrc.onmessage = (msg) => {
setResponse((response) => response + ' ' + msg.data);
}
}

--

--

No responses yet