React query for data streaming
Introduction
Everyone is familiar with the popular framework for managing API requests — @tanstack/react-query. This framework is designed for handling browser HTTP API requests. However, that’s not the only way to use it. React query also supports any promises within queries and mutations, which assists us in setting up data streaming and react query together. Let’s get started!
Theory
Client setup
Since We want to handle data streaming then We assume to use a some kind of WebSocket with STOMP for it. Let’s imagine that We have very simple data flow.
const stompClient = new StompClient()
stompClient.subscribe((event) => {...})
stompClient.publish('mychannel, {...})
To gain access from any part of our application, we should position this client within the React context, which will be provided in the root component. Currently, this context will only contain a client instance for accessing our queries and mutations.
const StompContext = createContext({
client: undefined
})
function StompManager({ children }: PropsWithChildren) {
const [client, setClient] = useState(new StompClient())
return <StompContext.Provider value={{ client }}>{children}</StompContext.Provider>
}
function useStompManager() {
const { client } = useContext(StompContext)
return client
}
Query
Let’s revisit how queries work. They consist of an identification key, a query function, and options. Since the identification key is self-explanatory, we’ll focus on the query function and options.
Firstly, each query should only be activated when our client is ready.
const client = useStompManager()
...
useQuery([QueryKey], () => {...}, { enabled: !!client })
Secondly, each query requires a function to subscribe to the channel and wait for new data. There are several different subscription types:
- One event subscription — This is a pattern where we receive something from the channel and then complete the subscription (e.g., getting a user profile).
- Infinite event subscription — This is a pattern where we listen to the channel until it’s unsubscribed or cancelled (e.g., listening to chat messages). Each subscription should be stored in a subscriptions storage to avoid generating multiple subscriptions on each query call.
Since we may have infinite queries, we need to create a subscriptions storage in the StompContext
. For this, I'll use the useMap
hook from the react-use
package. Let's establish a rule that the subscription identifier is the query key.
const [subscribers, { set, get }] = useMap<Record<string, boolean>>();
Mutation
The implementation of mutations can vary depending on the protocol used. For instance, chat messages could return a sent message in a channel, indicating successful transmission. However, this is a unique case. Suppose the STOMP client has a publish
method that returns a promise. In that case, this mutation doesn't have any specific logic.
function useStompMutation(mutationKey: string, destination: string) {
// getting client
// default useMutation hook with sending the message in mutation function
}
Sending without confirmation
function useStompConfirmableMutation(mutationKey: string, channel: string, destination: string) {
// getting client
// default useMutation hook with next mutation function:
// 0. Return the promise with resolve, reject parameters
// 1. Subscribing to specific channel and resolving the promise there
// 2. Sending a message based on mutation function parameters
// 3. Handling error to reject the promise
}
Sending with confirmation
Implementation
One event query
function listenOnce(client: StompClient, channel: string) {
return new Promise((resolve, reject) => {
const sub = client.subscribe(channel, event => {
sub.unsubscribe()
resolve(event)
})
})
}
function useStompQuery<DATA, ERROR>(
queryKey: QueryKey,
channel: string,
options?: UseQueryOptions<DATA, ERROR>
) {
const client = useStompClient()
return useQuery(
queryKey,
async () => listenOnce(),
{
...options,
enabled: !!client && (options?.enabled ?? true),
}
}
Infinite events query
function useStompQuery<DATA, ERROR>(
queryKey: QueryKey,
channel: string,
newDataResolver: (previousData: DATA[], newComingData: DATA) => void,
options?: UseQueryOptions<DATA[], ERROR>,
) {
const client = useStompClient()
const { setSubscriber, getSubscriber } = useContext(StompContext)
const queryClient = useQueryClient()
// Hook from react-use package
useMount(() => {
if (getSubscriber(queryKey)) {
return
}
const sub = client.subscribe(channel, event => {
const previousData = queryClient.getQueryData<DATA[]>(queryKey)
let nextData = newDataResolver(previousData, event)
queryClient.setQueryData(queryKey, nextData)
})
setSubscriber(queryKey, sub)
})
return useQuery<DATA[]>(
queryKey,
() => queryClient.getQueryData<DATA[]>(queryKey),
{
...options,
enabled: !!client && (options?.enabled ?? true),
}
}
Let’s highlight an implementation details:
newDataResolver
– special mutating function that allow us to control each new event coming(e.g. whenever new message coming then We could mark it with sent status);() => queryClient.getQueryData<DATA[]>(queryKey)
– as this query hasn’t one-time fetching function then We have to return the query itself data on each refetch.
Mutation without confirmation
function useStompMutation<T, DATA>(
mutationKey: string,
destination: string,
options?: UseMutationOptions<DATA>
) {
const client = useStompClient()
return useMutation<DATA>(
mutationKey,
async (message: T) => {
if (!client) {
throw new Error('Client not found')
}
client.publish(destination, message)
},
options
)
}
Mutation with confirmation
function useStompConfirmableMutation<T, DATA>(
mutationKey: string,
destination: string,
channel: string,
confirmation?: (event: DATA) => boolean,
options?: UseMutationOptions<DATA>
) {
const client = useStompClient()
return useMutation<DATA>(
mutationKey,
async (message: T) => new Promise(async (resolve, reject) => {
if (!client) {
throw new Error('Client not found')
}
client.subscribe(channel, event => {
if (confirmation(event)) {
resolve(event)
}
})
try {
await client.publish(destination, message)
} catch (e) {
reject(e)
}
}),
options
)
}
The potential issue here is that if the event doesn’t return in the channel, it may cause an infinite promise. Implementing timeout rejection could resolve this, but I’ll leave that up to you 🙂
We’ve successfully implemented a working solution for data streaming in React Query. However, this solution may not be ideal for all use-cases, so you might need to adjust it depending on your data streaming service implementation.
Thank you for reading!