Source code for openprotein.embeddings.future
"""Future for embeddings-related jobs."""
from collections import namedtuple
from typing import Generator
import numpy as np
from openprotein import config
"""Embeddings results represented as futures."""
from openprotein.base import APISession
from openprotein.jobs import Future, MappedFuture, StreamingFuture
from . import api
from .schemas import (
AttnJob,
EmbeddingsJob,
GenerateJob,
JobType,
LogitsJob,
ScoreIndelJob,
ScoreJob,
ScoreSingleSiteJob,
)
[docs]
class EmbeddingsResultFuture(MappedFuture, Future):
"""Future for manipulating results for embeddings-related requests."""
job: EmbeddingsJob | AttnJob | LogitsJob
[docs]
def __init__(
self,
session: APISession,
job: EmbeddingsJob | AttnJob | LogitsJob,
sequences: list[bytes] | list[str] | None = None,
max_workers: int = config.MAX_CONCURRENT_WORKERS,
):
super().__init__(session=session, job=job, max_workers=max_workers)
# ensure all list[bytes]
self._sequences = (
[seq.encode() if isinstance(seq, str) else seq for seq in sequences]
if sequences is not None
else sequences
)
[docs]
def stream(self):
return api.request_get_embeddings_stream(session=self.session, job_id=self.id)
[docs]
def get(self, verbose=False) -> list[np.ndarray]:
return super().get(verbose=verbose)
@property
def sequences(self) -> list[bytes] | list[str]:
if self._sequences is None:
self._sequences = api.get_request_sequences(
session=self.session, job_id=self.job.job_id, job_type=self.job.job_type
)
return self._sequences
@property
def id(self):
return self.job.job_id
def keys(self):
return self.sequences
[docs]
def get_item(self, sequence: bytes) -> np.ndarray:
"""
Get embedding results for specified sequence.
Args:
sequence (bytes): sequence to fetch results for
Returns:
np.ndarray: embeddings
"""
data = api.request_get_sequence_result(
session=self.session,
job_id=self.job.job_id,
sequence=sequence,
job_type=self.job.job_type,
)
return api.result_decode(data)
[docs]
class EmbeddingsScoreFuture(StreamingFuture, Future):
"""Future for manipulating results for embeddings score-related requests."""
job: ScoreJob | ScoreIndelJob | ScoreSingleSiteJob
[docs]
def __init__(
self,
session: APISession,
job: ScoreJob | ScoreSingleSiteJob | GenerateJob,
sequences: list[bytes] | list[str] | None = None,
):
super().__init__(session=session, job=job)
self._sequences = sequences
@property
def sequences(self) -> list[bytes] | list[str]:
if self._sequences is None:
self._sequences = api.get_request_sequences(self.session, self.job.job_id)
return self._sequences
[docs]
def stream(self) -> Generator:
if self.job_type == JobType.poet_generate:
stream = api.request_get_generate_result(
session=self.session, job_id=self.id
)
else:
stream = api.request_get_score_result(session=self.session, job_id=self.id)
# mut_code, ... for ssp
# name, sequence, ... for score
header = next(stream)
score_start_index = 0
for i, col_name in enumerate(header):
if col_name.startswith("score"):
score_start_index = i
break
Score = namedtuple("Score", header[:score_start_index] + ["score"])
for line in stream:
# combine scores into numpy array
scores = np.array([float(s) for s in line[score_start_index:]])
output = Score(*line[:score_start_index], scores)
yield output
[docs]
class EmbeddingsGenerateFuture(EmbeddingsScoreFuture, StreamingFuture, Future):
"""Future for manipulating results for embeddings generate-related requests."""
job: GenerateJob
@property
def sequences(self):
raise Exception("generate job does not support listing sequences")