2022-10-17

apache airflow idempotent DAG implementation

I am generating a start and end time for an API query using the following:

startTime = datetime.now(pytz.timezone('US/Eastern')) - timedelta(hours = 1)
endTime = datetime.now(pytz.timezone('US/Eastern'))

This works great and generates the correct parameters for the API query. But I noticed if the task fails and if I try to rerun the task again it uses new values for startTime and endTime based on the DAG executed runtime.

I am trying to figure out how I can make this more idempotent so if the task fails I can rerun it and the same startTime and endTime will be used from the original task execution.

I have read a bit about templates and macros but I can't seem to get it to work correctly.

Here is the task code. I am using the KubernetesPodOperator.

ant_get_logs = KubernetesPodOperator(
    env_vars={
        "startTime": startTime.strftime('%Y-%m-%d %H:%M:%S'),
        "endTime": endTime.strftime('%Y-%m-%d %H:%M:%S'),
        "timeZone":'US/Eastern',
        "session":'none',
    },

    volumes=[volume],
    volume_mounts=[volume_mount],

    task_id='ant_get_logs',
    image='test:1.0.0',
    image_pull_policy='Always',
    in_cluster=True,
    namespace=namespace,
    name='kubepod_ant_get_logs',
    random_name_suffix=True,
    labels={'app': 'backend', 'env': 'dev'},
    reattach_on_restart=True,
    is_delete_operator_pod=True,
    get_logs=True,
    log_events_on_failure=True,
)

Thanks



No comments:

Post a Comment