Skip to content

giftwrap

GIFTwrap: A Python package for analyzing GIFT-seq data.

The package provides both a CLI for transforming FASTQ files to counts matrices, as well as a Python API for analysis.

Modules:

  • analysis

    This module provides a collection of Python APIs for analyzing processed GIFT-seq data.

  • pipeline
  • pl

    This module provides plotting functions for visualizing gapfill and genotype data in AnnData objects.

  • pp

    This module provides functions to handle basic preprocessing tasks of GIFT-seq data including

  • sp

    This module contains functions for spatial analysis of Visium GIFT-seq data.

  • step1_count_gapfills
  • step3_correct_gapfill
  • step4_collect_counts
  • tl

    This module provides various tools for analyzing and manipulating a GIFT-seq dataset. Most true analysis tools live

  • utils

Classes:

Functions:

TechnologyFormatInfo

TechnologyFormatInfo(barcode_dir: Optional[str | Path] = None, read1_length: Optional[int] = None, read2_length: Optional[int] = None)

Bases: ABC

Generic class to hold metadata related to parsing Read1 and Read2.

Methods:

Attributes:

Source code in src/giftwrap/utils.py
def __init__(self,
             barcode_dir: Optional[str | Path] = None,
             read1_length: Optional[int] = None,
             read2_length: Optional[int] = None):
    self._read1_length = read1_length
    self._read2_length = read2_length

    if barcode_dir:
        self._barcode_dir = Path(barcode_dir)
    else:
        # Fallback to our resources directory
        self._barcode_dir = Path(__file__).parent / "resources"

barcode_coordinates abstractmethod property

barcode_coordinates: dict[str, tuple[int, int]]

The x and y coordinates of the barcode in the read.

barcode_tree cached property

barcode_tree: PrefixTrie

Return a prefix tree (trie) of the cell barcodes for fast mismatch searches.

Returns:

  • PrefixTrie

    The tree.

cell_barcode_start abstractmethod property

cell_barcode_start: int

The start position of the cell barcode in the read.

cell_barcodes abstractmethod property

cell_barcodes: list[str]

The list of potential barcodes.

constant_sequence abstractmethod property

constant_sequence: str

The constant sequence that is expected in the read.

constant_sequence_start abstractmethod property

constant_sequence_start: int

The start position of the constant sequence in the read. Note that this should be relative to the end of the read insert. For example, in 10X flex, 0 would be the first base after the LHS + gapfill + RHS.

has_constant_sequence abstractmethod property

has_constant_sequence: bool

Whether the read has a constant sequence.

has_probe_barcode abstractmethod property

has_probe_barcode: bool

Whether the read has a probe barcode.

is_spatial abstractmethod property

is_spatial: bool

Whether the technology is spatial. If true, then barcode_coordinates() must be defined.

max_cell_barcode_length cached property

max_cell_barcode_length: int

Returns the maximum length of a cell barcode.

probe_barcode_R1 property

probe_barcode_R1: bool

If true, the probe sample barcode is on R1 instead of R2.

probe_barcode_length abstractmethod property

probe_barcode_length: int

The length of the probe barcode.

probe_barcode_start abstractmethod property

probe_barcode_start: int

The start position of the probe barcode in the read. Note that this should be relative to the end of the constant sequence insert. For example, in 10X flex, 2 would be the first base after the constant sequence+NN.

probe_barcodes abstractmethod property

probe_barcodes: dict[str, str]

The list of potential probe barcodes.

read1_length property

read1_length: Optional[int]

This is the expected length of each R1 read, if defined the pipeline can improve performance.

Returns:

  • Optional[int]

    The length, or None if not defined.

read2_length property

read2_length: Optional[int]

This is the expected length of each R2 read, if defined the pipeline can improve performance.

Returns:

  • Optional[int]

    The length, or None if not defined.

umi_length abstractmethod property

umi_length: int

The length of the UMI sequence on R1.

umi_start abstractmethod property

umi_start: int

The start position of the UMI sequence in R1.

barcode2coordinates cached

barcode2coordinates(barcode: str) -> tuple[int, int]

Returns the X and Y coordinates of a barcode.

Parameters:

  • barcode

    (str) –

    The barcode.

Source code in src/giftwrap/utils.py
@functools.lru_cache(maxsize=1000)
def barcode2coordinates(self, barcode: str) -> tuple[int, int]:
    """
    Returns the X and Y coordinates of a barcode.
    :param barcode: The barcode.
    """
    return self.barcode_coordinates[barcode]

correct_barcode cached

correct_barcode(read: str, max_mismatches: int, start_idx: int, end_idx: int) -> tuple[Optional[str], int]

Given a probable barcode string, attempt to correct the sequence.

Parameters:

  • read

    (str) –

    The barcode-containing sequence.

  • max_mismatches

    (int) –

    The maximum number of mismatches to allow.

  • start_idx

    (int) –

    The start index of the barcode in the read.

  • end_idx

    (int) –

    The end index of the barcode in the read.

Returns:

  • tuple[Optional[str], int]

    The corrected barcode, or None if no match was found and the number of corrections required.

Source code in src/giftwrap/utils.py
@functools.lru_cache(1024)
def correct_barcode(self, read: str, max_mismatches: int, start_idx: int, end_idx: int) -> tuple[Optional[str], int]:
    """
    Given a probable barcode string, attempt to correct the sequence.
    :param read: The barcode-containing sequence.
    :param max_mismatches: The maximum number of mismatches to allow.
    :param start_idx: The start index of the barcode in the read.
    :param end_idx: The end index of the barcode in the read.
    :return: The corrected barcode, or None if no match was found and the number of corrections required.
    """
    return self.barcode_tree.search(read[start_idx:end_idx], max_mismatches)

make_barcode_string

make_barcode_string(cell_barcode: str, plex: str = '1', x_coord: Optional[int] = None, y_coord: Optional[int] = None, is_multiplexed: bool = False) -> str

Format a cell barcode into a string.

Parameters:

  • cell_barcode

    (str) –

    The barcode.

  • plex

    (str, default: '1' ) –

    The bc index for representing demultiplexed cells.

  • x_coord

    (Optional[int], default: None ) –

    The x coordinate.

  • y_coord

    (Optional[int], default: None ) –

    The y coordinate.

  • is_multiplexed

    (bool, default: False ) –

    Whether the data is multiplexed.

Source code in src/giftwrap/utils.py
def make_barcode_string(self, cell_barcode: str, plex: str = "1", x_coord: Optional[int] = None, y_coord: Optional[int] = None, is_multiplexed: bool = False) -> str:
    """
    Format a cell barcode into a string.
    :param cell_barcode: The barcode.
    :param plex: The bc index for representing demultiplexed cells.
    :param x_coord: The x coordinate.
    :param y_coord: The y coordinate.
    :param is_multiplexed: Whether the data is multiplexed.
    """
    return f"{cell_barcode}-{plex}"  # Naive multiplexed barcode

probe_barcode_index abstractmethod

probe_barcode_index(bc: str) -> int

Convert a probe barcode to an index.

Source code in src/giftwrap/utils.py
@abstractmethod
def probe_barcode_index(self, bc: str) -> int:
    """
    Convert a probe barcode to an index.
    """
    raise NotImplementedError()

filter_h5_file_by_barcodes

filter_h5_file_by_barcodes(input_file: Path, output_file: Path, barcodes_list: ArrayLike, pad_matrix: bool = True)

Given a counts h5 file and a list of barcodes, filter the barcodes to only include the ones in the list.

Parameters:

  • input_file

    (Path) –

    The input h5 file.

  • output_file

    (Path) –

    The output h5 file.

  • barcodes_list

    (ArrayLike) –

    The barcodes list.

  • pad_matrix

    (bool, default: True ) –

    Whether to pad the matrix with zeros if there are barcodes provided that don't exist in the file.

Source code in src/giftwrap/utils.py
def filter_h5_file_by_barcodes(input_file: Path, output_file: Path, barcodes_list: ArrayLike, pad_matrix: bool = True):
    """
    Given a counts h5 file and a list of barcodes, filter the barcodes to only include the ones in the list.
    :param input_file: The input h5 file.
    :param output_file: The output h5 file.
    :param barcodes_list: The barcodes list.
    :param pad_matrix: Whether to pad the matrix with zeros if there are barcodes provided that don't exist in the file.
    """
    # First, copy the file
    shutil.copy(input_file, output_file)

    # Convert barcodes_list to a set for O(1) lookups
    barcodes_set = set(barcodes_list)

    # Then open the file
    with h5py.File(output_file, 'r+') as f:
        # Read barcodes once
        barcodes_array = f['matrix']['barcode'][:]
        barcodes = np.char.decode(barcodes_array, 'utf-8') if barcodes_array.dtype.kind == 'S' else barcodes_array.astype(str)

        # Use vectorized numpy operations for finding indices
        # Create a boolean mask for matching barcodes
        mask = np.isin(barcodes, list(barcodes_set))
        barcode_indices = np.where(mask)[0]

        # Check if we need to filter the data
        if len(barcode_indices) == len(barcodes):
            return  # Equal size, no point in filtering

        if len(barcode_indices) == 0:
            raise ValueError("No barcodes found in the file.")

        # Calculate padded barcodes only if needed
        padded_barcodes = []
        if pad_matrix and len(barcodes_set) > len(barcode_indices):
            existing_barcodes = set(barcodes)
            padded_barcodes = np.array([bc for bc in barcodes_list if bc not in existing_barcodes], dtype='S')
            print(f"Padding {len(padded_barcodes)} unseen cells with zeroes.")

        # Filter the data
        filtered_barcodes = barcodes[barcode_indices]
        del f['matrix']['barcode']

        # Convert to bytes only once if needed
        if filtered_barcodes.dtype.kind != 'S':
            filtered_barcodes = np.array(filtered_barcodes, dtype='S')

        # Create the new dataset
        if len(padded_barcodes) > 0:
            if padded_barcodes.dtype.kind != 'S':
                padded_barcodes = np.array(padded_barcodes, dtype='S')
            combined_barcodes = np.concatenate([filtered_barcodes, padded_barcodes])
        else:
            combined_barcodes = filtered_barcodes

        f['matrix'].create_dataset("barcode", data=combined_barcodes, compression='gzip')

        # Filter cell_index if it exists
        if 'cell_index' in f['matrix']:
            cell_indices = f['matrix']['cell_index'][:]
            del f['matrix']['cell_index']
            # For padded barcodes, we'll use -1 as a placeholder for missing original indices
            if len(padded_barcodes) > 0:
                padded_cell_indices = np.full(len(padded_barcodes), -1, dtype=np.int32)
                combined_indices = np.concatenate([cell_indices[barcode_indices], padded_cell_indices])
            else:
                combined_indices = cell_indices[barcode_indices]
            f['matrix'].create_dataset("cell_index", data=combined_indices, compression='gzip')

        # Process each sparse matrix layer
        for layer_name in ['data', 'total_reads', 'percent_supporting']:
            data = read_sparse_matrix(f['matrix'], layer_name)
            # Efficiently extract rows
            data = data[barcode_indices, :]

            # Add padding only if needed
            if len(padded_barcodes) > 0:
                # Create empty sparse matrix only once with correct dimensions
                padding = scipy.sparse.csr_matrix((len(padded_barcodes), data.shape[1]), dtype=data.dtype)
                data = scipy.sparse.vstack([data, padding])

            del f['matrix'][layer_name]
            write_sparse_matrix(f['matrix'], layer_name, data)

        # If all_pcr_thresholds is present, filter it as well
        if 'max_pcr_duplicates' in f.attrs:
            max_pcr_dups = int(f.attrs['max_pcr_duplicates'])
            if max_pcr_dups > 1:
                all_pcr_grp = f["pcr_thresholded_counts"]
                for pcr_dup in range(1, max_pcr_dups):
                    layer_name = f"pcr{pcr_dup}"
                    if layer_name in all_pcr_grp:
                        data = read_sparse_matrix(all_pcr_grp, layer_name)
                        data = data[barcode_indices, :]

                        # Add padding only if needed
                        if len(padded_barcodes) > 0:
                            # Create empty sparse matrix only once with correct dimensions
                            padding = scipy.sparse.csr_matrix((len(padded_barcodes), data.shape[1]), dtype=data.dtype)
                            data = scipy.sparse.vstack([data, padding])

                        del all_pcr_grp[layer_name]
                        write_sparse_matrix(all_pcr_grp, layer_name, data)

        # Process cell metadata more efficiently
        if 'cell_metadata' in f:
            obs_meta_columns = f['cell_metadata']['columns'][:].astype(str)

            # Create a dictionary to hold column data
            meta_dict = {}
            for col in obs_meta_columns:
                values = f['cell_metadata'][col][:]
                if col == 'barcode':
                    if values.dtype.kind == 'S':
                        values = np.char.decode(values, 'utf-8')
                    meta_dict[col] = values
                else:
                    try:
                        meta_dict[col] = values
                    except:
                        meta_dict[col] = np.zeros_like(values, dtype=int)

            # Create filtered DataFrame
            obs_meta_df = pd.DataFrame(meta_dict)
            if 'barcode' in obs_meta_df.columns:
                obs_meta_df.set_index('barcode', inplace=True)
                filtered_meta = obs_meta_df.loc[barcodes[barcode_indices]].reset_index()
            else:
                filtered_meta = obs_meta_df.iloc[barcode_indices].reset_index(drop=True)

            # Add padding if needed
            if len(padded_barcodes) > 0:
                pad_dict = {col: ([pd.NA] * len(padded_barcodes)) for col in filtered_meta.columns if col != 'barcode'}
                if 'barcode' in filtered_meta.columns:
                    pad_dict['barcode'] = padded_barcodes
                pad_df = pd.DataFrame(pad_dict)
                filtered_meta = pd.concat([filtered_meta, pad_df], ignore_index=True)

            # Write back to file
            del f['cell_metadata']
            cell_metadata_grp = f.create_group('cell_metadata')
            cell_metadata_grp.create_dataset('columns', data=np.array(filtered_meta.columns, dtype='S'), compression='gzip')

            for col in filtered_meta.columns:
                values = filtered_meta[col].values
                # Convert to appropriate type
                if not np.issubdtype(values.dtype, np.number):
                    values = np.array(values, dtype='S')
                cell_metadata_grp.create_dataset(col, data=values, compression='gzip')

        # Update cell count
        if 'n_cells' in f.attrs:
            del f.attrs['n_cells']
            f.attrs['n_cells'] = len(combined_barcodes)

read_h5_file

read_h5_file(filename: str | Path) -> ad.AnnData

Read a generated h5 file and return an AnnData object.

Parameters:

  • filename

    (str | Path) –

    The filename.

Returns:

Source code in src/giftwrap/utils.py
def read_h5_file(filename: str | Path) -> ad.AnnData:
    """
    Read a generated h5 file and return an AnnData object.
    :param filename: The filename.
    :return: The AnnData object.
    """
    with h5py.File(filename, 'r') as f:
        X = read_sparse_matrix(f['matrix'], 'data')
        layers = {
            'total_reads': read_sparse_matrix(f['matrix'], 'total_reads'),  # Total umis encountered
            'percent_supporting': read_sparse_matrix(f['matrix'], 'percent_supporting'),  # Avg percent of umis supporting the gapfill call
        }
        var_df = pd.DataFrame({
            'probe': f['matrix']['probe'][:, 0].astype(str),
            'gapfill': f['matrix']['probe'][:, 1].astype(str),
        })

        # Add original probe indices if available
        if 'probe_index' in f['matrix']:
            var_df['probe_index'] = f['matrix']['probe_index'][:].astype(int)

        obs_df = pd.DataFrame({
            'barcode': f['matrix']['barcode'][:].astype(str),
        }).set_index('barcode')

        # Add original cell indices if available
        if 'cell_index' in f['matrix']:
            obs_df['cell_index'] = f['matrix']['cell_index'][:].astype(int)

        # Read the obs metadata
        obs_meta_columns = f['cell_metadata']['columns'][:].astype(str)
        obs_meta_df = dict()
        for column in obs_meta_columns:
            values = f['cell_metadata'][column][:]
            if column == 'barcode':
                values = values.astype(str)
            else:
                try:
                    values = values.astype(int)  # Most metadata are ints
                except:  # If that doesn't work, try string
                    try:
                        values = values.astype(str)
                    except:
                        values = np.zeros_like(values, dtype=int)  # Give up
            obs_meta_df[column] = values
        obs_meta_df = pd.DataFrame(obs_meta_df).set_index("barcode")

        obs_df = obs_df.merge(obs_meta_df, on='barcode', how='left')

        manifest = pd.DataFrame({
            'probe': f['probe_metadata']['name'][:].astype(str),
            'lhs_probe': f['probe_metadata']['lhs_probe'][:].astype(str),
            'rhs_probe': f['probe_metadata']['rhs_probe'][:].astype(str),
            'gap_probe_sequence': f['probe_metadata']['gap_probe_sequence'][:].astype(str),
            'original_gap_probe_sequence': f['probe_metadata']['original_sequence'][:].astype(str),
        })
        if 'gene' in f['probe_metadata']:
            manifest['gene'] = f['probe_metadata']['gene'][:].astype(str)

        # Check if probe names are unique on the manifest
        if len(manifest['probe'].unique()) != len(manifest):
            raise ValueError("Probe names are not unique.")

        # Add reference to var_df
        var_df = var_df.merge(manifest, on='probe', how='left')
        var_df = var_df.rename(columns={'gap_probe_sequence': 'expected_gapfill', 'original_gap_probe_sequence': 'reference_gapfill'})
        var_df['probe_gapfill'] = var_df['probe'].str.cat(var_df['gapfill'], sep='|')
        var_df = var_df.set_index('probe_gapfill', drop=True)

        adata = ad.AnnData(X,
                           layers=layers,
                           obs=obs_df,
                           var=var_df,
                           uns={
                                "probe_metadata": manifest,
                                "plex": f.attrs['plex'],
                                "project": f.attrs['project'],
                                "created_date": f.attrs['created_date'], #pd.Timestamp(f.attrs['created_date']),
                                "n_cells": f.attrs['n_cells'],
                                "n_probes": f.attrs['n_probes'],
                                "n_probe_gapfill_combinations": f.attrs['n_probe_gapfill_combinations'],
                                "max_pcr_duplicates": f.attrs['max_pcr_duplicates'] if 'max_pcr_duplicates' in f.attrs else -1,
                           })

        if 'max_pcr_duplicates' in f.attrs and int(f.attrs['max_pcr_duplicates']) > 1:
            # We must read the pcr thresholds save the counts matrices for each threshold to the layers
            dup_grp = f['pcr_thresholded_counts']
            for threshold in range(1, f.attrs['max_pcr_duplicates']):
                adata.layers[f'X_pcr_threshold_{threshold}'] = read_sparse_matrix(dup_grp, f'pcr{threshold}')

    # Check if array_col and array_row exist in obs
    # If present, verify that all are integers
    if 'array_col' in adata.obs.columns and 'array_row' in adata.obs.columns:
        col_mask = adata.obs['array_col'].isnull() | (~np.issubdtype(adata.obs['array_col'].dtype, np.integer))
        row_mask = adata.obs['array_row'].isnull() | (~np.issubdtype(adata.obs['array_row'].dtype, np.integer))
        if col_mask.any() or row_mask.any():
            # We will need to regenerate only the problematic array_col and array_row values
            print("Warning: 'array_col' and 'array_row' in obs contain non-integer or null values. Regenerating problematic values.")
            # Create masks for problematic values
            problematic_mask = col_mask | row_mask

            if problematic_mask.any():
                # Vectorized parse from index -> base part before '-'
                idx_series = pd.Series(adata.obs.index.astype(str), index=adata.obs.index)
                base = idx_series.str.split('-', n=1).str[0]
                # Extract last two underscore-delimited tokens
                parts = base.str.rsplit('_', n=2, expand=True)
                if parts.shape[1] < 3:
                    parts = parts.reindex(columns=range(3))

                array_row_parsed = pd.to_numeric(parts.iloc[:, -2], errors='coerce').fillna(-1).astype(int)
                array_col_parsed = pd.to_numeric(parts.iloc[:, -1], errors='coerce').fillna(-1).astype(int)

                need_col = problematic_mask & col_mask
                need_row = problematic_mask & row_mask

                if need_col.any():
                    adata.obs.loc[need_col, 'array_col'] = array_col_parsed.loc[need_col].to_numpy()
                if need_row.any():
                    adata.obs.loc[need_row, 'array_row'] = array_row_parsed.loc[need_row].to_numpy()
            # Ensure columns are integer type
            adata.obs['array_col'] = adata.obs['array_col'].astype(int)
            adata.obs['array_row'] = adata.obs['array_row'].astype(int)

    return adata

sequence_saturation_curve

sequence_saturation_curve(full_counts, n_points: int = 1000) -> np.array

Compute the sequencing saturation curve.

Parameters:

  • full_counts

    The cell x feature matrix where each count = # of reads..

  • n_points

    (int, default: 1000 ) –

    The number of points to compute the curve at. Note that this is computed on a log scale.

Returns:

  • array

    The saturation curve.

Source code in src/giftwrap/utils.py
def sequence_saturation_curve(full_counts, n_points: int = 1_000) -> np.array:
    """
    Compute the sequencing saturation curve.
    :param full_counts: The cell x feature matrix where each count = # of reads..
    :param n_points: The number of points to compute the curve at. Note that this is computed on a log scale.
    :return: The saturation curve.
    """
    # Convert to dense
    if scipy.sparse.issparse(full_counts):
        full_counts = full_counts.toarray()
    full_counts = full_counts.astype(int)

    # Compute the subsampled proportion
    proportions = np.linspace(0.00001, 1, n_points)
    saturations = np.zeros((n_points,2))

    for i, proportion in enumerate(proportions):
        # Randomly subsample the data
        subsampled = np.random.binomial(n=full_counts, p=proportion, size=full_counts.shape)

        # Compute the saturation
        saturation = sequencing_saturation(subsampled)

        # Compute the mean reads/cell
        mean_reads_per_cell = subsampled.sum(axis=1).mean()

        saturations[i,0] = mean_reads_per_cell
        saturations[i,1] = saturation

    return saturations

sequencing_saturation

sequencing_saturation(counts: array) -> float

Sequencing saturation is 1 - (n_deduped_reads / n_reads) where n_deduped_reads is the number of valid cell bc/valid umi/gene combinations and n_reads is the total number of reads with a valid mapping to a valid cell barcode and umi.

Parameters:

  • counts

    (array) –

    Counts should be the number of reads rather than UMIs.

Returns:

  • float

    The saturation.

Source code in src/giftwrap/utils.py
def sequencing_saturation(counts: np.array) -> float:
    """
    Sequencing saturation is 1 - (n_deduped_reads / n_reads)
    where n_deduped_reads is the number of valid cell bc/valid umi/gene combinations
    and n_reads is the total number of reads with a valid mapping to a valid cell barcode and umi.
    :param counts: Counts should be the number of reads rather than UMIs.
    :return: The saturation.
    """
    # Number of reads
    n_reads = counts.sum()
    # Number of deduped reads
    n_deduped_reads = (counts > 0).sum()
    return 1 - (n_deduped_reads / n_reads)

GIFTwrap API Documentation

This section provides comprehensive documentation of the giftwrap Python API, including functions and classes available for GIFT-seq data analysis. The API is designed to be user-friendly and integrates seamlessly with scverse-based workflows.

See the GIFTwrap analysis tutorial for a practical guide on using the API to process GIFT-seq data.