Source code for openprotein.api.data

import pandas as pd
import openprotein.pydantic as pydantic
from openprotein.pydantic import BaseModel
from typing import Optional, List, Union
from datetime import datetime
from io import BytesIO
from openprotein.errors import APIError
from openprotein.base import APISession
import openprotein.config as config


class AssayMetadata(BaseModel):
    assay_name: str
    assay_description: str
    assay_id: str
    original_filename: str
    created_date: datetime
    num_rows: int
    num_entries: int
    measurement_names: List[str]
    sequence_length: Optional[int] = None


class AssayDataRow(BaseModel):
    mut_sequence: str
    measurement_values: List[Union[float, None]]


class AssayDataPage(BaseModel):
    assaymetadata: AssayMetadata
    page_size: int
    page_offset: int
    assaydata: List[AssayDataRow]


def list_models(session: APISession, assay_id: str) -> List:
    """
    List models assoicated with assay.

    Parameters
    ----------
    session : APISession
        Session object for API communication.
    assay_id : str
        assay ID

    Returns
    -------
    List
        List of models
    """
    endpoint = "v1/models"
    response = session.get(endpoint, params={"assay_id": assay_id})
    return response.json()


def assaydata_post(
    session: APISession,
    assay_file,
    assay_name: str,
    assay_description: Optional[str] = "",
) -> AssayMetadata:
    """
    Post assay data.

    Parameters
    ----------
    session : APISession
        Session object for API communication.
    assay_file : str
        Path to the assay data file.
    assay_name : str
        Name of the assay.
    assay_description : str, optional
        Description of the assay, by default ''.

    Returns
    -------
    AssayMetadata
        Metadata of the posted assay data.
    """
    endpoint = "v1/assaydata"

    files = {"assay_data": assay_file}
    data = {"assay_name": assay_name, "assay_description": assay_description}

    response = session.post(endpoint, files=files, data=data)
    if response.status_code == 200:
        return pydantic.parse_obj_as(AssayMetadata, response.json())
    else:
        raise APIError(f"Unable to post assay data: {response.text}")


def assaydata_list(session: APISession) -> List[AssayMetadata]:
    """
    Get a list of all assay metadata.

    Parameters
    ----------
    session : APISession
        Session object for API communication.

    Returns
    -------
    List[AssayMetadata]
        List of all assay metadata.

    Raises
    ------
    APIError
        If an error occurs during the API request.
    """
    endpoint = "v1/assaydata"
    response = session.get(endpoint)
    if response.status_code == 200:
        return pydantic.parse_obj_as(List[AssayMetadata], response.json())
    else:
        raise APIError(f"Unable to list assay data: {response.text}")


def get_assay_metadata(session: APISession, assay_id: str) -> AssayMetadata:
    """
    Retrieve metadata for a specified assay.


    Parameters
    ----------
    session : APISession
        The current API session for communication with the server.
    assay_id : str
        The identifier of the assay for which metadata is to be retrieved.

    Returns
    -------
    AssayMetadata
        An AssayMetadata  that contains the metadata for the specified assay.

    Raises
    ------
    InvalidJob
        If no assay metadata with the specified assay_id is found.
    """

    endpoint = "v1/assaydata/metadata"
    response = session.get(endpoint, params={"assay_id": assay_id})
    if response.status_code == 200:
        data = pydantic.parse_obj_as(AssayMetadata, response.json())
    else:
        raise APIError(f"Unable to list assay data: {response.text}")
    if data == []:
        raise APIError(f"No assay with id={assay_id} found")
    return data


def assaydata_put(
    session: APISession,
    assay_id: str,
    assay_name: Optional[str] = None,
    assay_description: Optional[str] = None,
) -> AssayMetadata:
    """
    Update assay metadata.

    Parameters
    ----------
    session : APISession
        Session object for API communication.
    assay_id : str
        Id of the assay.
    assay_name : str, optional
        New name of the assay, by default None.
    assay_description : str, optional
        New description of the assay, by default None.

    Returns
    -------
    AssayMetadata
        Updated metadata of the assay.

    Raises
    ------
    APIError
        If an error occurs during the API request.
    """
    endpoint = f"v1/assaydata/{assay_id}"
    data = {}
    if assay_name is not None:
        data["assay_name"] = assay_name
    if assay_description is not None:
        data["assay_description"] = assay_description

    response = session.put(endpoint, data=data)
    if response.status_code == 200:
        return pydantic.parse_obj_as(AssayMetadata, response.json())
    else:
        raise APIError(f"Unable to update assay data: {response.text}")


def assaydata_page_get(
    session: APISession,
    assay_id: str,
    measurement_name: Optional[str] = None,
    page_offset: int = 0,
    page_size: int = 1000,
    data_format: str = "wide",
) -> AssayDataPage:
    """
    Get a page of assay data.

    Parameters
    ----------
    session : APISession
        Session object for API communication.
    assay_id : str
        Id of the assay.
    measurement_name : str, optional
        Name of the measurement, by default None.
    page_offset : int, optional
        Offset of the page, by default 0.
    page_size : int, optional
        Size of the page, by default 1000.
    data_format : str, optional
        data_format of the data, by default 'wide'.

    Returns
    -------
    AssayDataPage
        Page of assay data.

    Raises
    ------
    APIError
        If an error occurs during the API request.
    """
    endpoint = f"v1/assaydata/{assay_id}"

    params = {"page_offset": page_offset, "page_size": page_size, "format": data_format}
    if measurement_name is not None:
        params["measurement_name"] = measurement_name

    response = session.get(endpoint, params=params)
    if response.status_code == 200:
        return pydantic.parse_obj_as(AssayDataPage, response.json())
    else:
        raise APIError(f"Unable to get assay data page: {response.text}")


[docs] class AssayDataset: """Future Job for manipulating results"""
[docs] def __init__(self, session: APISession, metadata: AssayMetadata): """ init for AssayDataset. Parameters ---------- session : APISession Session object for API communication. metadata : AssayMetadata Metadata object of the assay data. """ self.session = session self.metadata = metadata self.page_size = config.BASE_PAGE_SIZE if self.page_size > 1000: self.page_size = 1000
def __str__(self) -> str: return str(self.metadata) def __repr__(self) -> str: return repr(self.metadata) @property def id(self): return self.metadata.assay_id @property def name(self): return self.metadata.assay_name @property def description(self): return self.metadata.assay_description @property def measurement_names(self): return self.metadata.measurement_names @property def sequence_length(self): return self.metadata.sequence_length def __len__(self): return self.metadata.num_rows @property def shape(self): return (len(self), len(self.measurement_names) + 1)
[docs] def list_models(self): """ List models assoicated with assay. Returns ------- List List of models """ return list_models(self.session, self.id)
[docs] def update( self, assay_name: Optional[str] = None, assay_description: Optional[str] = None ) -> None: """ Update the assay metadata. Parameters ---------- assay_name : str, optional New name of the assay, by default None. assay_description : str, optional New description of the assay, by default None. Returns ------- None """ metadata = assaydata_put( self.session, self.id, assay_name=assay_name, assay_description=assay_description, ) self.metadata = metadata
def _get_all(self, verbose: bool = False) -> pd.DataFrame: """ Get all assay data. Returns ------- pd.DataFrame Dataframe containing all assay data. """ step = self.page_size results = [] num_returned = step offset = 0 while num_returned >= step: try: result = self.get_slice(offset, offset + step) results.append(result) num_returned = len(result) offset += num_returned except APIError as exc: if verbose: print(f"Failed to get results: {exc}") return pd.concat(results) return pd.concat(results)
[docs] def get_first(self) -> pd.DataFrame: """ Get head slice of assay data. Returns ------- pd.DataFrame Dataframe containing the slice of assay data. """ rows = [] entries = assaydata_page_get(self.session, self.id, page_offset=0, page_size=1) for row in entries.assaydata: row = [row.mut_sequence] + row.measurement_values rows.append(row) table = pd.DataFrame(rows, columns=["sequence"] + self.measurement_names) return table
[docs] def get_slice(self, start: int, end: int) -> pd.DataFrame: """ Get a slice of assay data. Parameters ---------- start : int Start index of the slice. end : int End index of the slice. Returns ------- pd.DataFrame Dataframe containing the slice of assay data. """ rows = [] page_size = self.page_size # loop over the range for i in range(start, end, page_size): # the last page might be smaller than the page size current_page_size = min(page_size, end - i) entries = assaydata_page_get( self.session, self.id, page_offset=i, page_size=current_page_size ) for row in entries.assaydata: row = [row.mut_sequence] + row.measurement_values rows.append(row) table = pd.DataFrame(rows, columns=["sequence"] + self.measurement_names) return table
[docs] class DataAPI: """API interface for calling AssayData endpoints"""
[docs] def __init__(self, session: APISession): """ init the DataAPI. Parameters ---------- session : APISession Session object for API communication. """ self.session = session
[docs] def list(self) -> List[AssayDataset]: """ List all assay datasets. Returns ------- List[AssayDataset] List of all assay datasets. """ metadata = assaydata_list(self.session) return [AssayDataset(self.session, x) for x in metadata]
[docs] def create( self, table: pd.DataFrame, name: str, description: Optional[str] = None ) -> AssayDataset: """ Create a new assay dataset. Parameters ---------- table : pd.DataFrame DataFrame containing the assay data. name : str Name of the assay dataset. description : str, optional Description of the assay dataset, by default None. Returns ------- AssayDataset Created assay dataset. """ stream = BytesIO() table.to_csv(stream, index=False) stream.seek(0) metadata = assaydata_post( self.session, stream, name, assay_description=description ) metadata.sequence_length = len(table["sequence"].values[0]) return AssayDataset(self.session, metadata)
[docs] def get(self, assay_id: str, verbose: bool = False) -> AssayMetadata: """ Get an assay dataset by its ID. Parameters ---------- assay_id : str ID of the assay dataset. Returns ------- AssayDataset Assay dataset with the specified ID. Raises ------ KeyError If no assay dataset with the given ID is found. """ return get_assay_metadata(self.session, assay_id)
[docs] def load_assay(self, assay_id: str) -> AssayDataset: """ Reload a Submitted job to resume from where you left off! Parameters ---------- assay_id : str The identifier of the job whose details are to be loaded. Returns ------- Job Job Raises ------ HTTPError If the request to the server fails. InvalidJob If the Job is of the wrong type """ metadata = self.get(assay_id) # if job_details.job_type != JobType.train: # raise InvalidJob(f"Job {job_id} is not of type {JobType.train}") return AssayDataset( self.session, metadata, )
def __len__(self) -> int: """ Get the number of assay datasets. Returns ------- int Number of assay datasets. """ return len(self.list())