| import torch
|
| import torch.nn as nn
|
|
|
| from transformers import PreTrainedModel, PretrainedConfig
|
| from typing import Union
|
|
|
| from .config import MoondreamConfig
|
| from .moondream import MoondreamModel
|
|
|
|
|
| from .image_crops import *
|
| from .vision import *
|
| from .text import *
|
| from .region import *
|
| from .utils import *
|
|
|
|
|
| def extract_question(text):
|
| prefix = "<image>\n\nQuestion: "
|
| suffix = "\n\nAnswer:"
|
|
|
| if text.startswith(prefix) and text.endswith(suffix):
|
| return text[len(prefix) : -len(suffix)]
|
| else:
|
| return None
|
|
|
|
|
| class HfConfig(PretrainedConfig):
|
| _auto_class = "AutoConfig"
|
| model_type = "moondream1"
|
|
|
| def __init__(self, **kwargs):
|
| super().__init__(**kwargs)
|
| self.config = {}
|
|
|
|
|
| class HfMoondream(PreTrainedModel):
|
| _auto_class = "AutoModelForCausalLM"
|
| config_class = HfConfig
|
|
|
| def __init__(self, config):
|
| super().__init__(config)
|
| self.model = MoondreamModel(
|
| MoondreamConfig.from_dict(config.config), setup_caches=False
|
| )
|
| self._is_kv_cache_setup = False
|
|
|
| def _setup_caches(self):
|
| if not self._is_kv_cache_setup:
|
| self.model._setup_caches()
|
| self._is_kv_cache_setup = True
|
|
|
| @property
|
| def encode_image(self):
|
| self._setup_caches()
|
| return self.model.encode_image
|
|
|
| @property
|
| def query(self):
|
| self._setup_caches()
|
| return self.model.query
|
|
|
| @property
|
| def caption(self):
|
| self._setup_caches()
|
| return self.model.caption
|
|
|
| @property
|
| def detect(self):
|
| self._setup_caches()
|
| return self.model.detect
|
|
|
| @property
|
| def point(self):
|
| self._setup_caches()
|
| return self.model.point
|
|
|
| @property
|
| def detect_gaze(self):
|
| self._setup_caches()
|
| return self.model.detect_gaze
|
|
|
| def answer_question(
|
| self,
|
| image_embeds,
|
| question,
|
| tokenizer=None,
|
| chat_history="",
|
| result_queue=None,
|
| max_new_tokens=256,
|
| **kwargs
|
| ):
|
| answer = self.query(image_embeds, question)["answer"].strip()
|
|
|
| if result_queue is not None:
|
| result_queue.put(answer)
|
| return answer
|
|
|
| def batch_answer(self, images, prompts, tokenizer=None, **kwargs):
|
| answers = []
|
| for image, prompt in zip(images, prompts):
|
| answers.append(self.query(image, prompt)["answer"].strip())
|
| return answers
|
|
|
| def _unsupported_exception(self):
|
| raise NotImplementedError(
|
| "This method is not supported in the latest version of moondream. "
|
| "Consider upgrading to the updated API spec, or alternately pin "
|
| "to 'revision=2024-08-26'."
|
| )
|
|
|
| def generate(self, image_embeds, prompt, tokenizer, max_new_tokens=128, **kwargs):
|
| """
|
| Function definition remains unchanged for backwards compatibility.
|
| Be aware that tokenizer, max_new_takens, and kwargs are ignored.
|
| """
|
| prompt_extracted = extract_question(prompt)
|
| if prompt_extracted is not None:
|
| answer = self.model.query(
|
| image=image_embeds, question=prompt_extracted, stream=False
|
| )["answer"]
|
| else:
|
| image_embeds = self.encode_image(image_embeds)
|
| prompt_tokens = torch.tensor(
|
| [self.model.tokenizer.encode(prompt).ids],
|
| device=self.device,
|
| )
|
|
|
| def generator():
|
| for token in self.model._generate_answer(
|
| prompt_tokens,
|
| image_embeds.kv_cache,
|
| image_embeds.pos,
|
| max_new_tokens,
|
| ):
|
| yield token
|
|
|
| answer = "".join(list(generator()))
|
|
|
| return [answer]
|
|
|
| def get_input_embeddings(self) -> nn.Embedding:
|
| """
|
| Lazily wrap the raw parameter `self.model.text.wte` in a real
|
| `nn.Embedding` layer so that HF mix-ins recognise it. The wrapper
|
| **shares** the weight tensor—no copy is made.
|
| """
|
| if not hasattr(self, "_input_embeddings"):
|
| self._input_embeddings = nn.Embedding.from_pretrained(
|
| self.model.text.wte,
|
| freeze=True,
|
| )
|
| return self._input_embeddings
|
|
|
| def set_input_embeddings(self, value: Union[nn.Embedding, nn.Module]) -> None:
|
| """
|
| Lets HF functions (e.g. `resize_token_embeddings`) replace or resize the
|
| embeddings and keeps everything tied to `self.model.text.wte`.
|
| """
|
|
|
| self.model.text.wte = value.weight
|
|
|
| self._input_embeddings = value
|
|
|
| def input_embeds(
|
| self,
|
| input_ids: Union[torch.LongTensor, list, tuple],
|
| *,
|
| device: torch.device | None = None
|
| ) -> torch.FloatTensor:
|
| """
|
| Back-compat wrapper that turns token IDs into embeddings.
|
|
|
| Example:
|
| ids = torch.tensor([[1, 2, 3]])
|
| embeds = model.input_embeds(ids) # (1, 3, hidden_dim)
|
| """
|
| if not torch.is_tensor(input_ids):
|
| input_ids = torch.as_tensor(input_ids)
|
| if device is not None:
|
| input_ids = input_ids.to(device)
|
|
|
| return self.get_input_embeddings()(input_ids)
|
|
|