async handling in Go, unintuitive behavior with goroutines & context
Background
I am working on a server and decided to move away from traditional async processing for long running requests (pub/sub, etc.) by using goroutines and context. My idea was to take a request and kick off a goroutine with a new timeout context that will complete the processing regardless of if the initial request context is cancelled (i.e. user refreshes).
I had done this before on a single endpoint but wanted to make a generalized reusable wrapper this time that I could give a timeout and put an async chunk of code in (shown below is working code).
type ContextSplitter struct {
InitialCtx context.Context
Timeout time.Duration
}
func NewContextSplitter(timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: context.Background(),
Timeout: timeout,
}
}
func (c *ContextSplitter) Do(worker func(ctx context.Context) error) error {
var wg sync.WaitGroup
errs := make(chan error, 1)
newCtx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
wg.Add(1)
// run the worker
go func(ctx context.Context) {
defer wg.Done()
defer func() {
if r := recover(); r != nil {
// if the worker panics, send the panic error to the errs channel
errs <- fmt.Errorf("worker panic: %v", r)
}
}()
// call the worker function and send any returned errors to the errs channel
errs <- worker(ctx)
}(newCtx)
// create a sync.Once object to ensure that the done channel is only closed once
doneOnce := sync.Once{}
done := make(chan bool, 1)
// run a routine to listen for when the worker finishes executing
go func() {
wg.Wait()
done <- true
doneOnce.Do(func() {
close(errs)
close(done)
})
}()
select {
case <-c.InitialCtx.Done():
// initial context cancelled, continue processing in background
return c.InitialCtx.Err()
case <-done:
// continue
}
// collect any errors that occurred during execution and return them
var err error
for e := range errs {
if e != nil {
err = multierr.Append(err, e)
}
}
return err
}
Which is used as so
err := NewContextSplitter(time.Minute*5).Do(func(newCtx context.Context) error {
// do some long-running tasks, including propagating the newCtx
obj, err := doStuff(newCtx, stuff)
}
Question
I finally got it working but I am making this post because I am not completely sure why it works and am looking for some insight into the inner workings of golang, goroutines, & context.
The main fix ended up being the removal of initial (request) context from NewContextSplitter()
, i.e. this
func NewContextSplitter(initialCtx context.Context, timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: initialCtx,
Timeout: timeout,
}
}
to this
func NewContextSplitter(timeout time.Duration) *ContextSplitter {
return &ContextSplitter{
InitialCtx: context.Background(),
Timeout: timeout,
}
}
Basically, the request context would get cancelled and any functions in my worker that took the new (timeout) context would fail with ErrContextCancelled
. I am thinking that the initial context cancellation got propagated to my Do(worker)
and the defer cancel()
would get called, thereby cancelling my new timeout context. The strange thing is that the worker would not exit via the newCtx cancellation from the defer
, but continue running with a bunk context and err out on ErrContextCancelled
.
My main questions are:
- Why would passing
initialCtx
toNewContextSplitter
allow for the propagation of cancelling that context toDo(worker)
, sinceDo()
does not take that context? - Why does the worker continue processing with a cancelled context?
- Am I missing anything else / any suggestions?
- Is this a valid pattern for handling long running tasks?
Let me know if I can provide any more context (ha)
Comments
Post a Comment