async handling in Go, unintuitive behavior with goroutines & context

Background

I am working on a server and decided to move away from traditional async processing for long running requests (pub/sub, etc.) by using goroutines and context. My idea was to take a request and kick off a goroutine with a new timeout context that will complete the processing regardless of if the initial request context is cancelled (i.e. user refreshes).

I had done this before on a single endpoint but wanted to make a generalized reusable wrapper this time that I could give a timeout and put an async chunk of code in (shown below is working code).

type ContextSplitter struct {
    InitialCtx context.Context
    Timeout    time.Duration
}

func NewContextSplitter(timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: context.Background(),
        Timeout:    timeout,
    }
}

func (c *ContextSplitter) Do(worker func(ctx context.Context) error) error {
    var wg sync.WaitGroup
    errs := make(chan error, 1)
    newCtx, cancel := context.WithTimeout(context.Background(), c.Timeout)

    defer cancel()

    wg.Add(1)

    // run the worker
    go func(ctx context.Context) {
        defer wg.Done()
        defer func() {
            if r := recover(); r != nil {
                // if the worker panics, send the panic error to the errs channel
                errs <- fmt.Errorf("worker panic: %v", r)
            }
        }()

        // call the worker function and send any returned errors to the errs channel
        errs <- worker(ctx)
    }(newCtx)

    // create a sync.Once object to ensure that the done channel is only closed once
    doneOnce := sync.Once{}
    done := make(chan bool, 1)

    // run a routine to listen for when the worker finishes executing
    go func() {
        wg.Wait()
        done <- true
        doneOnce.Do(func() {
            close(errs)
            close(done)
        })
    }()

    select {
    case <-c.InitialCtx.Done():
        // initial context cancelled, continue processing in background
        return c.InitialCtx.Err()
    case <-done:
        // continue
    }

    // collect any errors that occurred during execution and return them
    var err error
    for e := range errs {
        if e != nil {
            err = multierr.Append(err, e)
        }
    }

    return err
}

Which is used as so

err := NewContextSplitter(time.Minute*5).Do(func(newCtx context.Context) error {
    // do some long-running tasks, including propagating the newCtx
    obj, err := doStuff(newCtx, stuff)
}

Question

I finally got it working but I am making this post because I am not completely sure why it works and am looking for some insight into the inner workings of golang, goroutines, & context.

The main fix ended up being the removal of initial (request) context from NewContextSplitter(), i.e. this

func NewContextSplitter(initialCtx context.Context, timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: initialCtx,
        Timeout:    timeout,
    }
}

to this

func NewContextSplitter(timeout time.Duration) *ContextSplitter {
    return &ContextSplitter{
        InitialCtx: context.Background(),
        Timeout:    timeout,
    }
}

Basically, the request context would get cancelled and any functions in my worker that took the new (timeout) context would fail with ErrContextCancelled. I am thinking that the initial context cancellation got propagated to my Do(worker) and the defer cancel() would get called, thereby cancelling my new timeout context. The strange thing is that the worker would not exit via the newCtx cancellation from the defer, but continue running with a bunk context and err out on ErrContextCancelled.

My main questions are:

  • Why would passing initialCtx to NewContextSplitter allow for the propagation of cancelling that context to Do(worker), since Do() does not take that context?
  • Why does the worker continue processing with a cancelled context?
  • Am I missing anything else / any suggestions?
  • Is this a valid pattern for handling long running tasks?

Let me know if I can provide any more context (ha)



Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation