Optimization with a nested set of constraints

I am trying to use Gekko to find an optimal solution for a camera lens distortion calibration problem (Undistort). In my case I am interested in solving a simpler problem that is only concerned with determining two or four variables (K1 K2 || K1 K2 CX CY). I was able to use scipy.optimize.minimize to come to some reasonable solutions that I know are within a good range, however, my solution relies on a rather complex invertibility constraint of the final result and I found that this was not possible to implement in scipy. So, I am looking for some help on how to model this invertibility constraint using Gekko.

The invertibility constraint is as follows ( see div_constraint_invertibility() ):

def get_rad_norm_k(dc, w, h):
      dmi = get_max_dist(dc=dc, w=w, h=h)
      r2 = m.sqrt(dmi) / 2
      r2_2 = r2*r2
      r2_4 = r2_2*r2_2
      denk1 = -12 * r2_2
      denk2 = -12 * r2_4
      return denk1, denk2
def get_max_dist(dc, h, w):
    corner_distances = [(dc[0] - 0)**2 + (dc[1] - 0)**2,
                        (dc[0] - 0)**2 + (dc[1] - w)**2,
                        (dc[0] - h)**2 + (dc[1] - w)**2,
                        (dc[0] - h)**2 + (dc[1] - 0)**2]

    mx = corner_distances[0]
    for cd in corner_distances[1:]:
        mx = m.max3(mx, cd)
    return mx
def div_constraint_invertibility(x, ldm):

    p1 = x[0]
    p2 = x[1]
    dc = [ldm.Cx, ldm.Cy]
    
    w = ldm.width
    h = ldm.height
    denk1, denk2 = get_rad_norm_k(dc=dc, w=w, h=h)
    
    r1sq = get_max_dist(dc=dc, w=w, h=h)
    r1p4 = r1sq**2
    
    k1 = (((-p1) / (1 + p1)) + ((16 * p2) / (1 + p2))) / denk1
    k2 = (((-4 * p2) / (1 + p2)) + (p1 / (1 + p1))) / denk2
    
    if -2 < r1sq * k1 < 2:
        if -1. - r1sq * k1 < r1p4 * k2 < (1. - r1sq * k1) / 3:
            return 1  # Passed Check
    else:
        if r1sq * k1 >= 2:
            if -1. - r1sq * k1 < r1p4 * k2 < (-r1p4 * k1**2 / 12):
                return 1  # Passed Check
    
    return -1  # Failed Check

The list of values in argument x contains [P1, P2] or [P1, P2, CX, CY] where P1 and P2 are normalized values of K1 and K2. I have tried to implement the methods using Gekko functions, my next step was to try to implement the conditional checks in "div_constraint_invertibility" using m.if3(). Although, I am wondering if there is a better way to format the problem here as I feel that such a nested m.if3() structure is not a very elegant solution and moreover I am not certain what I should actually return from the function (I probably should be returning some function value rather than -1 or 1...).

EDIT 1 Scipy entry point and other contextual code. Here is the scipy minimization approach. The function cumulative_point_to_line_error takes a set of points belonging to an arc extracted from a distorted image and applies the distortion model to determine the line equation fit error after undistorting the image points. An initial starting point looks something like:

dm0 = [-0.0, 0.0, input_image_center_X, input_image_center_Y]

Along with an array of points that correspond to an arc. For example, depending on the image there may be 30 lists containing sets of points relating to the same extracted arc.

I can provide even more code but wanted to avoid dumping a wall of code...let me know and I can dm you it or I can provide more code here too.

def model_estimation(ip, w, h, oc):
    ldm = ip.lens_model
    dc = [ldm.Cx, ldm.Cy]
    prevk1 = ldm.d[1]

    if len(ldm.d) != 3:
        ldm.d.append(0)
    prevk2 = ldm.d[2]
    if oc:
        dm0 = [prevp1, prevp2, dc[0], dc[1]]
    else:
        dm0 = [prevp1, prevp2]

    cons = []
    bnds = []
    if oc:
        dc_drift = 2
        bnds = [(-np.inf, np.inf), (-np.inf, np.inf),
                (float(ldm.width / 2 - dc_drift), float(ldm.width / 2 + dc_drift)),
                (float(ldm.height / 2 - dc_drift), float(ldm.height / 2 + dc_drift))]

    cons.append({'type': 'ineq', 'fun': div_constraint_invertibility, 'args': (ip.lens_model,)})

    print(f"Optimizing: {dm0}")
    if cons:
        print(f"Constraints:")
        for con in cons:
            print(f"Const: {con['fun'].__name__}")
    if bnds:
        print(f"Bounds:")
        for idx, bnd in enumerate(bnds):
            print(f"{bnd[0]} <= x{idx} <= {bnd[1]}")

    result = spo.minimize(fun=cumulative_point_to_line_error, x0=np.array(dm0), args=(ip,),
                          constraints=cons, bounds=bnds, method='SLSQP', options={"disp": True})

    print(f"Optimization Result: {result.x} | {result.fun}")

EDIT 2

I gave it a shot trying to convert my objective to one that is useable with Gekko. Is is possible to call m.Minimize on something like 'cumulative_point_to_line_error'? Currently working on how to handle the division by 0 check in the 'div_update_point' method:

def HornerPoly(poly, n, nrmx):
    sol = poly[n]
    i = n - 1
    while i > -1:
        sol = sol * nrmx + poly[i]
        i -= 1
    return sol


def div_update_point(m, px, py, K1, K2, dc_x, dc_y):
    center_dist = (px - dc_x)**2 + (py - dc_y)**2
    dist_coef = [1, K1, K2]
    A = HornerPoly(dist_coef, len(dist_coef) - 1, center_dist)

    # m.IF ?
    if A == 0:
        return px, py
    else:
        A = 1.0 / A

    new_x = dc_x + (px - dc_x) * A
    new_y = dc_y + (py - dc_y) * A
    return new_x, new_y

def cumulative_point_to_line_error(m, p1, p2, dc_x, dc_y, w, h, houghlines):

n1 = m.Intermediate(((-p1) / (1 + p1)) + (16 * p2 / (1 + p2)))
n2 = m.Intermediate(((-4 * p2) / (1 + p2)) + (p1 / (1 + p1)))
denk1, denk2 = get_rad_norm_k(m, dc_x=dc_x, dc_y=dc_y, w=w, h=h)

cumulative_err = 0
for idx, hl in enumerate(houghlines):

    pnts_o = hl.pnts
    err = 0
    zero = 10e-100
    num_pnts = len(pnts_o)
    um = vm = 0
    suu = suv = svv = 0

    pnts_e = []
    for pnt in pnts_o:
        x, y = div_update_point(px=pnt[0], py=pnt[1], K1=n1/denk1, K2=n2/denk2, dc_x=dc_x, dc_y=dc_y)
        pnts_e.append([x, y])

    for pnt in pnts_e:
        x = pnt[0]
        y = pnt[1]
        um += x
        vm += y
    um = um / num_pnts
    vm = vm / num_pnts

    for pnt in pnts_e:
        x = pnt[0]
        y = pnt[1]
        suu += (x - um)**2
        suv += (x - um) * (y - vm)
        svv += (y - vm)**2
    suu = suu / num_pnts
    suv = suv / num_pnts
    svv = svv / num_pnts

    if np.abs(suv) <= zero:
        # print('XY <= ZERO*')
        if suu < svv and svv > zero:
            hl.a = 1.
            hl.b = 0.
            hl.c = -um
            err = 0.
            # print(f'Iter. Method Line: {self.a}x + {self.b}y + {self.c} = 0')
            # print(f'Error == {err}')
            return hl, err
        if svv < suu and suu > zero:
            hl.a = 0.
            hl.b = 1.
            hl.c = -vm
            err = 0
            # print(f'Iter. Method Line: {self.a}x + {self.b}y + {self.c} = 0')
            # print(f'Error == {err}')
            return hl, err
        # print('Failed to recalculate!')
        return None, err

    r = np.array([[1, 0, 0],
                  [1, 0, 0],
                  [0, 1, 0],
                  [0, 1, 0]], dtype=float)

    h = 0.5 * (suu - svv) / suv
    if h > 0:
        r[0, 1] = -h - np.sqrt(1. + np.square(h))
        r[0, 2] = -(um + r[0, 1] * vm)
        r[1, 1] = -1. / r[0, 1]
        r[1, 2] = -(um + r[1, 1] * vm)
        r[2, 0] = h + np.sqrt(1. + np.square(h))
        r[2, 2] = -(r[2, 0] * um + vm)
        r[3, 0] = -1. / r[2, 0]
        r[3, 2] = -(r[3, 0] * um + vm)
    else:
        r[0, 1] = -h + np.sqrt(1. + np.square(h))
        r[0, 2] = -(um + r[0, 1] * vm)
        r[1, 1] = -1. / r[0, 1]
        r[1, 2] = -(um + r[1, 1] * vm)
        r[2, 0] = h - np.sqrt(1. + np.square(h))
        r[2, 2] = -(r[2, 0] * um + vm)
        r[3, 0] = -1. / r[2, 0]
        r[3, 2] = -(r[3, 0] * um + vm)

    for row in range(0, r.shape[0]):
        norm = np.sqrt(np.square(r[row, 0]) + np.square(r[row, 1]))
        for col in range(0, r.shape[1]):
            r[row, col] /= norm

    min = 0
    k = 0
    for pnt in pnts_e:
        x = pnt[0]
        y = pnt[1]
        sol = r[0, 0] * x + r[0, 1] * y + r[0, 2]
        min += np.square(sol)

    for row in range(1, r.shape[0]):
        h = 0
        for pnt in pnts_e:
            x = pnt[0]
            y = pnt[1]
            sol = r[row, 0] * x + r[row, 1] * y + r[row, 2]
            h += np.square(sol)
        if h < min:
            k = row
            min = h

    hl.a = r[k, 0]
    hl.b = r[k, 1]
    hl.c = r[k, 2]
    # err = min

    # Update ip TODO: edit houghline list?
    houghlines[idx] = hl
    cumulative_err += min
return cumulative_err


Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation