Airflow - DVC integration in Covid Genomics

Airflow - DVC integration in Covid Genomics

Ariflow DVC forthebadge

Let us introduce ourselves. We are Covid Genomics, a little biotechnological startup that’s focused on modeling SARS Cov-2 evolution and optimizing RT-PCR tests.

After we created our first models a new question emerged. Can we update the data we use for training models in a daily manner? Can we automate data processing easily? The dataset that we operate on is highly dynamic. Each day about 2.8k new sequences are submitted into the Gisaid database. We want to stay up to date with those sequences, so how can we achieve this goal and always have the latest data?

To this point we used DVC for versioning the data and manually downloaded the sequences and run the processing scripts.

As the automation solution that will speed up this process we considered Luigi, but the final decision was to use Apache Airflow. It’s mature, easy to set up and perfectly fits our requirements. In Airflow it’s possible to create cron job to download and process all the data we need without too much hustle.

In this post we will describe how we integrated Airflow with our existing DVC workflow and show you how we learnt to write custom Airflow operators to create fully automated pipeline we are proud of.

What is Airflow?

Airflow is an Apache platform to manage, schedule and monitor workflows. Airflow gives you abstraction layer to create any tasks you want. Whether you are designing ML model training piepeline, or scientific data transformations and aggregation it’s definitely a tool to consider. Please note that Airflow shines in orchestration and dependency management for pipelines. However, if the efficient and advanced big data analytics is a requirement, tools like Spark are much better suited for the job.

What is DVC?

DVC is a shorthand for data version control. It allows you to version artifacts like git LFS, but with additional features, like better support for storage backends (you just need a single S3 bucket to get started), easy to understand versioning or simple pipelines.

Why we want to connect both of them?

DVC is great for versioning data, however DVC pipelines are useful for rapid iteration of experiments, but too simple to be used in advanced setups. On the other hand Airflow is amazing at managing task dependencies and offers magnitude of integrations. It allows you to run tasks on you machine, Kubernetes cluster, Spark or inside Docker. The integrations allows you to query databases, read remote files in different formats or send messages to the Slack channels.

Naturally the new idea emerges - to use both of them together.

In this setup we will store all of the data on DVC and use Airflow operators to transform it, merge or split and save all the artifacts back to the DVC.

Of course we could use other means of storage like pushing all of the artifacts to S3. However DVC offers better file versioning (it’s much easier to track changes when all the modifications are tracked by git). Moreover the DVC gives us simple workflow that is similar to git, so it takes seconds to learn how to use it if you know how the former tool works.

Example Apache Airflow flow diagram

How to achieve that?

Airflow pipelines are DAGs defined in Python. Each DAG consists of operators. Operator is a single task and it depends on you how you define it. There’s a little example if you are unfamiliar with Airflow:

import logging
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def foo():
    print("Hello foo")

def bar():
    print("Hello bar")

# Definition of our pipeline
with DAG(
    'simple_test_dag',
    schedule_interval="@daily",
    catchup=True,
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 7),
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(seconds=30),
        "sla": timedelta(hours=23),
    },
) as dag:
    # We define tasks
    task_foo = PythonOperator(
        task_id="task_foo",
        python_callable=foo,
    )
    task_bar = PythonOperator(
        task_id="task_bar",
        python_callable=bar,
    )

		# task_bar depends on task_foo
    task_foo >> task_bar

There are operators to run Python code, bash scripts, run Docker images or do more complicated things like spawning job on Kubernetes cluster.

There is a mean to upload artifacts to S3 and it’s called S3 hook. It’s not documented well, but you can learn from the various examples how to use it. Unfortunately, there’s no integration for DVC.

What we can do about that? Write our own operators!

How do I even run Airflow?

First, we will start with running Airflow locally.

You can clone our repo with prepared examples:

$ git clone https://github.com/covid-genomics/public-examples.git
$ cd public-examples/airflow_setup

You need to have Poetry installed. We use Poetry extensively in Covid Genomics. Installation is simple and very straightforward:

$ python3 -m pip install poetry

Now we install dependencies and start Airflow:

$ poetry install
$ ./run_airflow.sh

You can navigate to the [localhost:8080](http://localhost:8080) and login with credentials:

user: admin
password: admin

Screenshot of the example Airflow DAGs listing

As you can see you are provided with extensive list of various examples of pipelines. You can press a switch next to a DAG name to enable it. The DAG will normally start to execute for the first time. Feel free to explore the examples.

Now we will write our own DAG. Let’s create a file our_dag.py inside the airflow/dags directory:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def foo():
    print("Hello foo")

def bar():
    print("Hello bar")

# Definition of our pipeline
with DAG(
    'our_dag',
    schedule_interval="@daily",
    catchup=True,
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 7),
    },
) as dag:
    # We define tasks
    task_foo = PythonOperator(
        task_id="task_foo",
        python_callable=foo,
    )
    task_bar = PythonOperator(
        task_id="task_bar",
        python_callable=bar,
    )

    task_foo >> task_bar

If you press Ctrl+C and run ./run_airflow.sh again, you will see our_dag on the list of all DAGs. You can run it. The tasks do nothing except for printing strings on the screen (see the console output for those strings).

Writing our first operator ever

To implement DVC operator we can start by extending the BaseOperator (a base class for all operators), however we want to just run Python code, so why not extending PythonOperator?

We start by adding the following code to the our_dag.py:

# Required imports
from airflow.operators.python_operator import PythonOperator
from airflow.utils.decorators import apply_defaults

# We define our custom operator
class DVCFileDownloadOperator(PythonOperator):
    # Those are required parameters passed to the class constructor
		# that we will explain below
    dvc_repo: str
    dvc_file: str

    @apply_defaults
    def __init__(
        self,
        dvc_repo: str,
        dvc_file: str,
        **kwargs
    ) -> None:
        # Here we just create a PythonOperator that executes our method
        super().__init__(**kwargs, python_callable=self._execute_operator)
        self.dvc_repo = dvc_repo
        self.dvc_file = dvc_file

		# This method will be executed for the operator
    # For now it does nothing
    def _execute_operator(self, *args, **kwargs) -> None:
        return None

This operator can be used like this:

task_bar = DVCFileDownloadOperator(
      dvc_repo="https://github.com/iterative/dataset-registry",
      dvc_file="get-started/data.xml",
      task_id="task_bar",
  )

Okay now when we will run the pipeline this task will run our _execute_operator method as a task content.

Now why we defined those two fields i.e dvc_repo and dvc_file?

DVC has API to download files located on the remote server:

import dvc.api

with dvc.api.open(
    'get-started/data.xml',
    repo='https://github.com/iterative/dataset-registry'
) as fd:
      # ... fd is a file descriptor that can be processed normally.
      # We can for example print content of the file
	    print(fd.read())

The documentation for the DVC api is available here. dvc_repo will be DVC repository url (git clone url) dvc_file will be path to the file located on the DVC

Downloading our first files

We need to add a field to specify output path for our downloaded file and add contents of our operator:

class DVCFileDownloadOperator(PythonOperator):
    dvc_repo: str
    dvc_file: str
    # We add new field to the class
    output_path: str

    @apply_defaults
    def __init__(
        self,
        dvc_repo: str,
        dvc_file: str,
        output_path: str,
        **kwargs
    ) -> None:
        super().__init__(**kwargs, python_callable=self._execute_operator)
        self.dvc_repo = dvc_repo
        self.dvc_file = dvc_file
        self.output_path = output_path

    def _execute_operator(self, *args, **kwargs) -> str:
        # Open remote file
        with dvc.api.open(
            self.dvc_file,
            repo=self.dvc_repo,
        ) as fd:
            # Open local file to write the content
            with open(self.output_path, "w") as out:
                out.write(fd.read())
        # Return the local output path. It's not required however it can be useful in the future
        return self.output_path

Now we install dvc and rerun Airflow:

$ poetry add dvc
$ ./run_airflow.sh

If you run the DAG it should output logs similar to this:

-------------------------------------------------------------------------------
[2021-05-27 18:00:55,556] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2021-05-27 18:00:55,556] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-05-27 18:00:55,561] {taskinstance.py:1087} INFO - Executing <Task(DVCFileDownloadOperator): task_bar> on 2021-02-25T00:00:00+00:00
[2021-05-27 18:00:55,562] {standard_task_runner.py:52} INFO - Started process 231591 to run task
[2021-05-27 18:00:55,564] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'our_dag', 'task_bar', '2021-02-25T00:00:00+00:00', '--job-id', '426', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/our_dag.py', '--cfg-path', '/tmp/tmpr4gbyes7', '--error-file', '/tmp/tmpj97i2mwy']
[2021-05-27 18:00:55,564] {standard_task_runner.py:77} INFO - Job 426: Subtask task_bar
[2021-05-27 18:00:55,577] {logging_mixin.py:104} INFO - Running <TaskInstance: our_dag.task_bar 2021-02-25T00:00:00+00:00 [running]> on host mymachine
[2021-05-27 18:00:55,600] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=our_dag
AIRFLOW_CTX_TASK_ID=task_bar
AIRFLOW_CTX_EXECUTION_DATE=2021-02-25T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-02-25T00:00:00+00:00
[2021-05-27 18:01:00,911] {python.py:151} INFO - Done. Returned value was: file.txt
[2021-05-27 18:01:00,918] {taskinstance.py:1184} INFO - Marking task as SUCCESS. dag_id=our_dag, task_id=task_bar, execution_date=20210225T000000, start_date=20210527T160055, end_date=20210527T160100
[2021-05-27 18:01:00,925] {taskinstance.py:1245} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-05-27 18:01:00,967] {local_task_job.py:151} INFO - Task exited with return code 0

Hey! Done. Returned value was: file.txt Our task has succeeded! That’s great.

You can also see that in the current directory a new file was created:

$ ls
airflow  file.txt  poetry.lock  pyproject.toml  README.md  run_airflow.sh

Full example

Here’s the full example we wrote. It’s available in the Github repo:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import dvc.api

class DVCFileDownloadOperator(PythonOperator):
    dvc_repo: str
    dvc_file: str
    output_path: str

    def __init__(
        self,
        dvc_repo: str,
        dvc_file: str,
        output_path: str,
        **kwargs
    ) -> None:
        super().__init__(**kwargs, python_callable=self._execute_operator)
        self.dvc_repo = dvc_repo
        self.dvc_file = dvc_file
        self.output_path = output_path

    def _execute_operator(self, *args, **kwargs) -> str:
        with dvc.api.open(
            self.dvc_file,
            repo=self.dvc_repo,
        ) as fd:
            with open(self.output_path, "w") as out:
                out.write(fd.read())
        return self.output_path

    
def foo():
    print("Hello foo")

    
# Definition of our pipeline
with DAG(
    'our_dag',
    schedule_interval="@daily",
    catchup=True,
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 7),
    },
) as dag:
    # We define tasks
    task_foo = PythonOperator(
        task_id="task_foo",
        python_callable=foo,
    )
    task_bar = DVCFileDownloadOperator(
        dvc_repo="https://github.com/iterative/dataset-registry",
        dvc_file="get-started/data.xml",
        output_path="file.txt",
        task_id="task_bar",
    )

    task_foo >> task_bar

In reality the DVC operations are not that simple. API allows us to download file contents, but it does not offer possibility to upload files. In Covid Genomics we overcame this issue by creating dvc-fs. It’s simple wrapper around DVC with little tricks to easily allow you to update or download files.

We wrapped this interface into rich plugin that you can use in your pipelines.

Airflow DVC

You can install DVC plugin we created with the following command:

  $ python3 -m pip install airflow-dvc

Or if you are using Poetry to run Apache Airflow:

  $ poetry add apache-airflow@latest
  $ poetry add airflow-dvc@latest

What does the package contain?

Starting from the operator described in this article we created download and update operators as well as the DVC hook. Putting it simply hooks in Apache Airflow are just classes that provide specific utilities - like S3 hook. All operators call hook underneath and of course if your usage exceeds power of the operators you can use the hook directly.

We also created a few sensible sensors for DVC to detect file updates.

Moreover we wrapped all this features in a Airflow plugin, adding new views to the Airflow interface.

📊 DVC Operator view

After installation, you should be able to access Browse > DVC Operators option in the Airflow menu.

Airflow menu screenshot

The DVC Operators view allows you to display all configured DVC operators and repositories that they will push the files to/pull from. This view is useful if you have complicated configuration with many separate DVC repositories. Splitting data between them is useful in case where you have many ad-hoc research projects that contain very specific data format that is highly not reusable.

In our case we maintain multiple DVC repositories for each research project and one big data repository for all the raw sequences or reusable aggregations.

DVC Operators view screenshot

The DVC Pushes view allows you to display all commits created by the DVC operators among all repositories:

DVC Operators view screenshot

This is still work in progress, but the view allow you to track recent changes made by operators.

💾 How to upload a file to DVC?

The upload operator supports various types of data inputs that you can feed into it.

Uploading a string as a file: In this case we generate content of the file from string. This option may seem to be not very practical, however you can use Airflow templates in most of the operators' parameters like input/output path or in this case file content. With the strength of the Airflow Jinja templating you can use the DVCStringUpload to generate file content with result of other task

from airflow_dvc import DVCUpdateOperator, DVCStringUpload
from datetime import datetime

upload_task = DVCUpdateOperator(
    dvc_repo="<REPO_CLONE_URL>",
    files=[
        DVCStringUpload("data/1.txt", f"This will be saved into DVC. Current time: {datetime.now()}"),
    ],
    task_id='update_dvc',
)

If you want to access value returned by other task you may want to create the following upload object:

DVCStringUpload("data.txt", "{{ task_instance.xcom_pull(task_ids='some_task_id') }}"),

Uploading local file using its path: This use case is more standard. You want to upload a file that exist locally:

from airflow_dvc import DVCUpdateOperator, DVCPathUpload

upload_task = DVCUpdateOperator(
    dvc_repo="<REPO_CLONE_URL>",
    files=[
        DVCPathUpload("data/1.txt", "~/local_file_path.txt"),
    ],
    task_id='update_dvc',
)

Upload content generated by a python function: In this example we have a function that generates a filename. For example you can imagine workflow where the name of the file is determined by a response from the database/API or it’s not known when the DAG is created.

from airflow_dvc import DVCUpdateOperator, DVCCallbackUpload

upload_task = DVCUpdateOperator(
    dvc_repo="<REPO_CLONE_URL>",
    files=[
        DVCCallbackUpload("data/1.txt", lambda: "Test data"),
    ],
    task_id='update_dvc',
)

Uploading file from S3: This is specially useful when you have a workflow that uses S3Hook to temporarily save the data between the tasks. DVC operators can download or upload files from S3 very easily. You just need to specify connection details (as in case of S3 hook).

You may wonder what happens if we use S3 for temporary storage for files shared between tasks and DVC uses S3 backend. In this case we generally download file from S3 to reupload it to different S3 managed by DVC. It may seem to be unreasonable, however DVC purpose is to store the final product of the pipeline and not the byproducts of tasks. Moreover S3 is popular mean to store temporary data if it does not fit into XCOM.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta

from io import StringIO
import pandas as pd
import requests

from airflow_dvc import DVCUpdateOperator, DVCS3Upload

s3_conn_id = 's3-conn'
bucket = 'astro-workshop-bucket'
state = 'wa'
date = '{{ yesterday_ds_nodash }}'

def upload_to_s3(state, date):
    '''Grabs data from Covid endpoint and saves to flat file on S3
    '''
    # Connect to S3
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)

    # Get data from API
    url = '<https://covidtracking.com/api/v1/states/>'
    res = requests.get(url+'{0}/{1}.csv'.format(state, date))

    # Save data to CSV on S3
    s3_hook.load_string(res.text, '{0}_{1}.csv'.format(state, date), bucket_name=bucket, replace=True)

def process_data(state, date):
    '''Reads data from S3, processes, and saves to new S3 file
    '''
    # Connect to S3
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)

    # Read data
    data = StringIO(s3_hook.read_key(key='{0}_{1}.csv'.format(state, date), bucket_name=bucket))
    df = pd.read_csv(data, sep=',')

    # Process data
    processed_data = df[['date', 'state', 'positive', 'negative']]

    # Save processed data to CSV on S3
    s3_hook.load_string(processed_data.to_string(), 'dvc_upload.csv', bucket_name=bucket, replace=True)

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

with DAG('intermediary_data_storage_dag',
         start_date=datetime(2021, 1, 1),
         max_active_runs=1,
         schedule_interval='@daily',
         default_args=default_args,
         catchup=False
         ) as dag:

    generate_file = PythonOperator(
        task_id='generate_file_{0}'.format(state),
        python_callable=upload_to_s3,
        op_kwargs={'state': state, 'date': date}
    )

    process_data = PythonOperator(
        task_id='process_data_{0}'.format(state),
        python_callable=process_data,
        op_kwargs={'state': state, 'date': date}
    )

    upload_to_dvc = DVCUpdateOperator(
        dvc_repo="<REPO_CLONE_URL>",
        files=[
            DVCS3Upload("dvc_path/data.txt", s3_conn_id, bucket, 'dvc_upload.csv'),
        ],
        task_id='update_dvc',
    )

    generate_file >> process_data
    process_data >> upload_to_dvc

Uploading file from S3, but using task arguments: Instead of passing list as a files parameter you can pass function. It allows us to define the list of files in runtime. It’s similar to what we did in case of DVCCallbackUpload, but here we dynamically define the list of files, not single path. Please note that for most of the cases Airflow templating system is fine and you don’t have to use functions at all. This feature does not work well with DVC Pushes view and should be used only in case when there’s no other possibility (i.e. you upload a batch of files and their number is dynamic and can’t be deduced when the DAG is created).

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime, timedelta

from io import StringIO
import pandas as pd
import requests

from airflow_dvc import DVCUpdateOperator, DVCS3Upload

s3_conn_id = 's3-conn'
bucket = 'astro-workshop-bucket'
state = 'wa'
date = '{{ yesterday_ds_nodash }}'

def upload_to_s3(state, date):
    '''Grabs data from Covid endpoint and saves to flat file on S3
    '''
    # Connect to S3
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)

    # Get data from API
    url = '<https://covidtracking.com/api/v1/states/>'
    res = requests.get(url+'{0}/{1}.csv'.format(state, date))

    # Save data to CSV on S3
    s3_hook.load_string(res.text, '{0}_{1}.csv'.format(state, date), bucket_name=bucket, replace=True)

def process_data(state, date):
    '''Reads data from S3, processes, and saves to new S3 file
    '''
    # Connect to S3
    s3_hook = S3Hook(aws_conn_id=s3_conn_id)

    # Read data
    data = StringIO(s3_hook.read_key(key='{0}_{1}.csv'.format(state, date), bucket_name=bucket))
    df = pd.read_csv(data, sep=',')

    # Process data
    processed_data = df[['date', 'state', 'positive', 'negative']]

    # Save processed data to CSV on S3
    s3_hook.load_string(processed_data.to_string(), '{0}_{1}_processed.csv'.format(state, date), bucket_name=bucket, replace=True)

def get_files_for_upload(state, date):
    return [
        DVCS3Upload("dvc_path/data.txt", s3_conn_id, bucket, '{0}_{1}_processed.csv'.format(state, date)),
    ]

# Default settings applied to all tasks
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1)
}

with DAG('intermediary_data_storage_dag',
         start_date=datetime(2021, 1, 1),
         max_active_runs=1,
         schedule_interval='@daily',
         default_args=default_args,
         catchup=False
         ) as dag:

    generate_file = PythonOperator(
        task_id='generate_file_{0}'.format(state),
        python_callable=upload_to_s3,
        op_kwargs={'state': state, 'date': date}
    )

    process_data = PythonOperator(
        task_id='process_data_{0}'.format(state),
        python_callable=process_data,
        op_kwargs={'state': state, 'date': date}
    )

    # Passing a function as files paramterer (it sould return a list of DVCUpload objects)
    # Also we specify op_kwargs to allow passing of parameters as in case of normal PythonOperator
    upload_to_dvc = DVCUpdateOperator(
        dvc_repo="<REPO_CLONE_URL>",
        files=get_files_for_upload,
        task_id='update_dvc',
        op_kwargs={'state': state, 'date': date}
    )

    generate_file >> process_data
    process_data >> upload_to_dvc

⬇️ Downloading files from DVC

We can use DVCDownloadOperator similarly to the DVCUpdateOperator. The syntax is the same:

from airflow_dvc import DVCDownloadOperator, DVCCallbackDownload

# Download DVC file data/1.txt and print it on the screen
upload_task = DVCDownloadOperator(
    dvc_repo="<REPO_CLONE_URL>",
    files=[
        DVCCallbackDownload("data/1.txt", lambda content: print(content)),
    ],
    task_id='update_dvc',
)

We don’t want to describe all of the possible combinations as the usage is nearly the same as in case of uploads

👀 Sensors

This is a feature that wasn’t discussed in this post. If you are interested in more details how sensors in Airflow work, you can check out our blog page soon. We will publish more tutorials on writing Airflow sensor or creating custom plugins.

Generally speakingDVCSensor will allow you to pause the DAG run until the specified file will be updated. It’s useful in case when the part of the DVC is updated by an external mean independent to the executed DAG.

One such example would be starting the DAG to wait for DVC update on file done by a human to update other assets in the repository.

The sensor checks the date of the latest DAG run and compares it with timestamp of meta DVC file in the repo.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator

from airflow_dvc import DVCUpdateSensor

with DAG('dvc_sensor_example', description='Another tutorial DAG',
    start_date=datetime(2017, 3, 20),
    catchup=False,
) as dag:

    dummy_task = DummyOperator(task_id='dummy_task', dag=dag)

    sensor_task = DVCUpdateSensor(
        task_id='dvc_sensor_task',
        dag=dag,
        dvc_repo="<REPO_CLONE_URL>",
        files=["data/1.txt"],
    )

    task = BashOperator(
        task_id='task_triggered_by_sensor',
        bash_command='echo "OK" && ( echo $[ ( $RANDOM % 30 )  + 1 ] > meowu.txt ) && cat meowu.txt')

    dummy_task >> sensor_task >> task

Summary

We use the DVC Airflow plugin extensively inside Covid Genomics. We hope that it will be useful for someone else.

The repositories for the plugin and dvc-fs are publicly available here:

If you want to run examples shown in this post you can access them here: https://github.com/covid-genomics/public-examples

Piotr Styczyński

Chief Engineering Officer (CEngO)

comments powered by Disqus

Recent Articles

blog-image
Airflow - DVC integration in Covid Genomics

Ariflow DVC Let us introduce ourselves. We are Covid Genomics, a little biotechnological startup that’s focused on modeling SARS Cov-2 evolution and optimizing RT-PCR tests.