Image-Text-to-Text
Transformers
Safetensors
multilingual
internvl_chat
feature-extraction
internvl
custom_code
conversational
Instructions to use OpenGVLab/VisualPRM-8B with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use OpenGVLab/VisualPRM-8B with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("image-text-to-text", model="OpenGVLab/VisualPRM-8B", trust_remote_code=True) messages = [ { "role": "user", "content": [ {"type": "image", "url": "https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/p-blog/candy.JPG"}, {"type": "text", "text": "What animal is on the candy?"} ] }, ] pipe(text=messages)# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("OpenGVLab/VisualPRM-8B", trust_remote_code=True, dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use OpenGVLab/VisualPRM-8B with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "OpenGVLab/VisualPRM-8B" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "OpenGVLab/VisualPRM-8B", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker
docker model run hf.co/OpenGVLab/VisualPRM-8B
- SGLang
How to use OpenGVLab/VisualPRM-8B with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "OpenGVLab/VisualPRM-8B" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "OpenGVLab/VisualPRM-8B", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "OpenGVLab/VisualPRM-8B" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/chat/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "OpenGVLab/VisualPRM-8B", "messages": [ { "role": "user", "content": [ { "type": "text", "text": "Describe this image in one sentence." }, { "type": "image_url", "image_url": { "url": "https://cdn.britannica.com/61/93061-050-99147DCE/Statue-of-Liberty-Island-New-York-Bay.jpg" } } ] } ] }' - Docker Model Runner
How to use OpenGVLab/VisualPRM-8B with Docker Model Runner:
docker model run hf.co/OpenGVLab/VisualPRM-8B
| # -------------------------------------------------------- | |
| # InternVL | |
| # Copyright (c) 2024 OpenGVLab | |
| # Licensed under The MIT License [see LICENSE for details] | |
| # -------------------------------------------------------- | |
| import math | |
| import warnings | |
| from typing import List, Optional, Tuple, Union | |
| import torch.utils.checkpoint | |
| import transformers | |
| from torch import nn | |
| from torch.nn import CrossEntropyLoss | |
| from transformers import (AutoModel, GenerationConfig, LlamaForCausalLM, | |
| LlamaTokenizer) | |
| from transformers.modeling_outputs import CausalLMOutputWithPast | |
| from transformers.modeling_utils import PreTrainedModel | |
| from transformers.utils import ModelOutput, logging | |
| from .configuration_internvl_chat import InternVLChatConfig | |
| from .conversation import get_conv_template | |
| from .modeling_intern_vit import InternVisionModel, has_flash_attn | |
| from .modeling_internlm2 import InternLM2ForCausalLM | |
| logger = logging.get_logger(__name__) | |
| def version_cmp(v1, v2, op='eq'): | |
| import operator | |
| from packaging import version | |
| op_func = getattr(operator, op) | |
| return op_func(version.parse(v1), version.parse(v2)) | |
| class InternVLChatModel(PreTrainedModel): | |
| config_class = InternVLChatConfig | |
| main_input_name = 'pixel_values' | |
| base_model_prefix = 'language_model' | |
| _supports_flash_attn_2 = True | |
| _no_split_modules = ['InternVisionModel', 'LlamaDecoderLayer', 'InternLM2DecoderLayer'] | |
| def __init__(self, config: InternVLChatConfig, vision_model=None, language_model=None, use_flash_attn=True): | |
| super().__init__(config) | |
| assert version_cmp(transformers.__version__, '4.37.0', 'ge') | |
| image_size = config.force_image_size or config.vision_config.image_size | |
| patch_size = config.vision_config.patch_size | |
| self.patch_size = patch_size | |
| self.select_layer = config.select_layer | |
| self.template = config.template | |
| self.num_image_token = int((image_size // patch_size) ** 2 * (config.downsample_ratio ** 2)) | |
| self.downsample_ratio = config.downsample_ratio | |
| self.ps_version = config.ps_version | |
| use_flash_attn = use_flash_attn if has_flash_attn else False | |
| config.vision_config.use_flash_attn = True if use_flash_attn else False | |
| config.llm_config.attn_implementation = 'flash_attention_2' if use_flash_attn else 'eager' | |
| logger.info(f'num_image_token: {self.num_image_token}') | |
| logger.info(f'ps_version: {self.ps_version}') | |
| if vision_model is not None: | |
| self.vision_model = vision_model | |
| else: | |
| self.vision_model = InternVisionModel(config.vision_config) | |
| if language_model is not None: | |
| self.language_model = language_model | |
| else: | |
| if config.llm_config.architectures[0] == 'LlamaForCausalLM': | |
| self.language_model = LlamaForCausalLM(config.llm_config) | |
| elif config.llm_config.architectures[0] == 'InternLM2ForCausalLM': | |
| self.language_model = InternLM2ForCausalLM(config.llm_config) | |
| else: | |
| raise NotImplementedError(f'{config.llm_config.architectures[0]} is not implemented.') | |
| vit_hidden_size = config.vision_config.hidden_size | |
| llm_hidden_size = config.llm_config.hidden_size | |
| self.mlp1 = nn.Sequential( | |
| nn.LayerNorm(vit_hidden_size * int(1 / self.downsample_ratio) ** 2), | |
| nn.Linear(vit_hidden_size * int(1 / self.downsample_ratio) ** 2, llm_hidden_size), | |
| nn.GELU(), | |
| nn.Linear(llm_hidden_size, llm_hidden_size) | |
| ) | |
| self.img_context_token_id = None | |
| self.conv_template = get_conv_template(self.template) | |
| self.system_message = self.conv_template.system_message | |
| def forward( | |
| self, | |
| pixel_values: torch.FloatTensor, | |
| input_ids: torch.LongTensor = None, | |
| attention_mask: Optional[torch.Tensor] = None, | |
| position_ids: Optional[torch.LongTensor] = None, | |
| image_flags: Optional[torch.LongTensor] = None, | |
| past_key_values: Optional[List[torch.FloatTensor]] = None, | |
| labels: Optional[torch.LongTensor] = None, | |
| use_cache: Optional[bool] = None, | |
| output_attentions: Optional[bool] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| return_dict: Optional[bool] = None, | |
| ) -> Union[Tuple, CausalLMOutputWithPast]: | |
| return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
| image_flags = image_flags.squeeze(-1) | |
| input_embeds = self.language_model.get_input_embeddings()(input_ids).clone() | |
| vit_embeds = self.extract_feature(pixel_values) | |
| vit_embeds = vit_embeds[image_flags == 1] | |
| vit_batch_size = pixel_values.shape[0] | |
| B, N, C = input_embeds.shape | |
| input_embeds = input_embeds.reshape(B * N, C) | |
| if torch.distributed.is_initialized() and torch.distributed.get_rank() == 0: | |
| print(f'dynamic ViT batch size: {vit_batch_size}, images per sample: {vit_batch_size / B}, dynamic token length: {N}') | |
| input_ids = input_ids.reshape(B * N) | |
| selected = (input_ids == self.img_context_token_id) | |
| try: | |
| input_embeds[selected] = input_embeds[selected] * 0.0 + vit_embeds.reshape(-1, C) | |
| except Exception as e: | |
| vit_embeds = vit_embeds.reshape(-1, C) | |
| print(f'warning: {e}, input_embeds[selected].shape={input_embeds[selected].shape}, ' | |
| f'vit_embeds.shape={vit_embeds.shape}') | |
| n_token = selected.sum() | |
| input_embeds[selected] = input_embeds[selected] * 0.0 + vit_embeds[:n_token] | |
| input_embeds = input_embeds.reshape(B, N, C) | |
| outputs = self.language_model( | |
| inputs_embeds=input_embeds, | |
| attention_mask=attention_mask, | |
| position_ids=position_ids, | |
| past_key_values=past_key_values, | |
| use_cache=use_cache, | |
| output_attentions=output_attentions, | |
| output_hidden_states=output_hidden_states, | |
| return_dict=return_dict, | |
| ) | |
| logits = outputs.logits | |
| loss = None | |
| if labels is not None: | |
| # Shift so that tokens < n predict n | |
| shift_logits = logits[..., :-1, :].contiguous() | |
| shift_labels = labels[..., 1:].contiguous() | |
| # Flatten the tokens | |
| loss_fct = CrossEntropyLoss() | |
| shift_logits = shift_logits.view(-1, self.language_model.config.vocab_size) | |
| shift_labels = shift_labels.view(-1) | |
| # Enable model parallelism | |
| shift_labels = shift_labels.to(shift_logits.device) | |
| loss = loss_fct(shift_logits, shift_labels) | |
| if not return_dict: | |
| output = (logits,) + outputs[1:] | |
| return (loss,) + output if loss is not None else output | |
| return CausalLMOutputWithPast( | |
| loss=loss, | |
| logits=logits, | |
| past_key_values=outputs.past_key_values, | |
| hidden_states=outputs.hidden_states, | |
| attentions=outputs.attentions, | |
| ) | |
| def pixel_shuffle(self, x, scale_factor=0.5): | |
| n, w, h, c = x.size() | |
| # N, W, H, C --> N, W, H * scale, C // scale | |
| x = x.view(n, w, int(h * scale_factor), int(c / scale_factor)) | |
| # N, W, H * scale, C // scale --> N, H * scale, W, C // scale | |
| x = x.permute(0, 2, 1, 3).contiguous() | |
| # N, H * scale, W, C // scale --> N, H * scale, W * scale, C // (scale ** 2) | |
| x = x.view(n, int(h * scale_factor), int(w * scale_factor), | |
| int(c / (scale_factor * scale_factor))) | |
| if self.ps_version == 'v1': | |
| warnings.warn("In ps_version 'v1', the height and width have not been swapped back, " | |
| 'which results in a transposed image.') | |
| else: | |
| x = x.permute(0, 2, 1, 3).contiguous() | |
| return x | |
| def extract_feature(self, pixel_values): | |
| if self.select_layer == -1: | |
| vit_embeds = self.vision_model( | |
| pixel_values=pixel_values, | |
| output_hidden_states=False, | |
| return_dict=True).last_hidden_state | |
| else: | |
| vit_embeds = self.vision_model( | |
| pixel_values=pixel_values, | |
| output_hidden_states=True, | |
| return_dict=True).hidden_states[self.select_layer] | |
| vit_embeds = vit_embeds[:, 1:, :] | |
| h = w = int(vit_embeds.shape[1] ** 0.5) | |
| vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], h, w, -1) | |
| vit_embeds = self.pixel_shuffle(vit_embeds, scale_factor=self.downsample_ratio) | |
| vit_embeds = vit_embeds.reshape(vit_embeds.shape[0], -1, vit_embeds.shape[-1]) | |
| vit_embeds = self.mlp1(vit_embeds) | |
| return vit_embeds | |
| def batch_chat(self, tokenizer, pixel_values, questions, generation_config, num_patches_list=None, | |
| history=None, return_history=False, IMG_START_TOKEN='<img>', IMG_END_TOKEN='</img>', | |
| IMG_CONTEXT_TOKEN='<IMG_CONTEXT>', verbose=False, image_counts=None): | |
| if history is not None or return_history: | |
| print('Now multi-turn chat is not supported in batch_chat.') | |
| raise NotImplementedError | |
| if image_counts is not None: | |
| num_patches_list = image_counts | |
| print('Warning: `image_counts` is deprecated. Please use `num_patches_list` instead.') | |
| img_context_token_id = tokenizer.convert_tokens_to_ids(IMG_CONTEXT_TOKEN) | |
| self.img_context_token_id = img_context_token_id | |
| if verbose and pixel_values is not None: | |
| image_bs = pixel_values.shape[0] | |
| print(f'dynamic ViT batch size: {image_bs}') | |
| queries = [] | |
| for idx, num_patches in enumerate(num_patches_list): | |
| question = questions[idx] | |
| if pixel_values is not None and '<image>' not in question: | |
| question = '<image>\n' + question | |
| template = get_conv_template(self.template) | |
| template.system_message = self.system_message | |
| template.append_message(template.roles[0], question) | |
| template.append_message(template.roles[1], None) | |
| query = template.get_prompt() | |
| image_tokens = IMG_START_TOKEN + IMG_CONTEXT_TOKEN * self.num_image_token * num_patches + IMG_END_TOKEN | |
| query = query.replace('<image>', image_tokens, 1) | |
| queries.append(query) | |
| tokenizer.padding_side = 'left' | |
| model_inputs = tokenizer(queries, return_tensors='pt', padding=True) | |
| input_ids = model_inputs['input_ids'].to(self.device) | |
| attention_mask = model_inputs['attention_mask'].to(self.device) | |
| eos_token_id = tokenizer.convert_tokens_to_ids(template.sep.strip()) | |
| generation_config['eos_token_id'] = eos_token_id | |
| generation_output = self.generate( | |
| pixel_values=pixel_values, | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| **generation_config | |
| ) | |
| responses = tokenizer.batch_decode(generation_output, skip_special_tokens=True) | |
| responses = [response.split(template.sep.strip())[0].strip() for response in responses] | |
| return responses | |
| def chat(self, tokenizer, pixel_values, question, generation_config, history=None, return_history=False, | |
| num_patches_list=None, IMG_START_TOKEN='<img>', IMG_END_TOKEN='</img>', IMG_CONTEXT_TOKEN='<IMG_CONTEXT>', | |
| verbose=False): | |
| if history is None and pixel_values is not None and '<image>' not in question: | |
| question = '<image>\n' + question | |
| if num_patches_list is None: | |
| num_patches_list = [pixel_values.shape[0]] if pixel_values is not None else [] | |
| assert pixel_values is None or len(pixel_values) == sum(num_patches_list) | |
| img_context_token_id = tokenizer.convert_tokens_to_ids(IMG_CONTEXT_TOKEN) | |
| self.img_context_token_id = img_context_token_id | |
| template = get_conv_template(self.template) | |
| template.system_message = self.system_message | |
| eos_token_id = tokenizer.convert_tokens_to_ids(template.sep.strip()) | |
| history = [] if history is None else history | |
| for (old_question, old_answer) in history: | |
| template.append_message(template.roles[0], old_question) | |
| template.append_message(template.roles[1], old_answer) | |
| template.append_message(template.roles[0], question) | |
| template.append_message(template.roles[1], None) | |
| query = template.get_prompt() | |
| if verbose and pixel_values is not None: | |
| image_bs = pixel_values.shape[0] | |
| print(f'dynamic ViT batch size: {image_bs}') | |
| for num_patches in num_patches_list: | |
| image_tokens = IMG_START_TOKEN + IMG_CONTEXT_TOKEN * self.num_image_token * num_patches + IMG_END_TOKEN | |
| query = query.replace('<image>', image_tokens, 1) | |
| model_inputs = tokenizer(query, return_tensors='pt') | |
| input_ids = model_inputs['input_ids'].to(self.device) | |
| attention_mask = model_inputs['attention_mask'].to(self.device) | |
| generation_config['eos_token_id'] = eos_token_id | |
| generation_output = self.generate( | |
| pixel_values=pixel_values, | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| **generation_config | |
| ) | |
| response = tokenizer.batch_decode(generation_output, skip_special_tokens=True)[0] | |
| response = response.split(template.sep.strip())[0].strip() | |
| history.append((question, response)) | |
| if return_history: | |
| return response, history | |
| else: | |
| query_to_print = query.replace(IMG_CONTEXT_TOKEN, '') | |
| query_to_print = query_to_print.replace(f'{IMG_START_TOKEN}{IMG_END_TOKEN}', '<image>') | |
| if verbose: | |
| print(query_to_print, response) | |
| return response | |
| def generate( | |
| self, | |
| pixel_values: Optional[torch.FloatTensor] = None, | |
| input_ids: Optional[torch.FloatTensor] = None, | |
| attention_mask: Optional[torch.LongTensor] = None, | |
| visual_features: Optional[torch.FloatTensor] = None, | |
| generation_config: Optional[GenerationConfig] = None, | |
| output_hidden_states: Optional[bool] = None, | |
| **generate_kwargs, | |
| ) -> torch.LongTensor: | |
| assert self.img_context_token_id is not None | |
| if pixel_values is not None: | |
| if visual_features is not None: | |
| vit_embeds = visual_features | |
| else: | |
| vit_embeds = self.extract_feature(pixel_values) | |
| input_embeds = self.language_model.get_input_embeddings()(input_ids) | |
| B, N, C = input_embeds.shape | |
| input_embeds = input_embeds.reshape(B * N, C) | |
| input_ids = input_ids.reshape(B * N) | |
| selected = (input_ids == self.img_context_token_id) | |
| assert selected.sum() != 0 | |
| input_embeds[selected] = vit_embeds.reshape(-1, C).to(input_embeds.device) | |
| input_embeds = input_embeds.reshape(B, N, C) | |
| else: | |
| input_embeds = self.language_model.get_input_embeddings()(input_ids) | |
| outputs = self.language_model.generate( | |
| inputs_embeds=input_embeds, | |
| attention_mask=attention_mask, | |
| generation_config=generation_config, | |
| output_hidden_states=output_hidden_states, | |
| use_cache=True, | |
| **generate_kwargs, | |
| ) | |
| return outputs | |
| class InternVLRewardModel(InternVLChatModel): | |
| def split_response(response, sep='\n\n', max_steps=None): | |
| steps = response.split(sep) | |
| if max_steps is not None: | |
| step = math.ceil(len(steps) / max_steps) | |
| new_steps = [] | |
| for i in range(0, len(steps), step): | |
| new_steps.append(sep.join(steps[i:i+step])) | |
| return new_steps | |
| return steps | |
| def join_steps(steps, sep='\n\n'): | |
| return sep.join(steps) | |
| def find_placeholder_idx(self, tokenizer, input_ids, PLACEHOLDER): | |
| # TODO: support batch inference | |
| input_ids = input_ids[0].tolist() | |
| template = get_conv_template(self.template) | |
| idx = [] | |
| bos = tokenizer(template.roles[1], add_special_tokens=False).input_ids | |
| target = tokenizer(template.roles[1] + PLACEHOLDER + template.sep, add_special_tokens=False).input_ids | |
| for i in range(len(input_ids)): | |
| if input_ids[i:i+len(target)] == target: | |
| assert i + len(bos) - 1 >= 0 | |
| idx.append(i + len(bos) - 1) | |
| return idx | |
| def generate_steps_with_soft_score( | |
| self, | |
| tokenizer, | |
| question, | |
| response, | |
| pixel_values, | |
| num_patches_list=None, | |
| max_steps=None, | |
| IMG_START_TOKEN='<img>', | |
| IMG_END_TOKEN='</img>', | |
| IMG_CONTEXT_TOKEN='<IMG_CONTEXT>', | |
| PLACEHOLDER=None, | |
| str2score=None, | |
| ): | |
| if str2score is None: | |
| str2score = {'+': 1, '-': 0} | |
| if PLACEHOLDER is None: | |
| PLACEHOLDER = '+' | |
| if pixel_values is not None and '<image>' not in question: | |
| num_images = 1 if num_patches_list is None else len(num_patches_list) | |
| question = '<image>\n' * num_images + question | |
| if num_patches_list is None: | |
| num_patches_list = [pixel_values.shape[0]] if pixel_values is not None else [] | |
| assert pixel_values is None or (len(pixel_values) == sum(num_patches_list) and len(num_patches_list) == question.count('<image>')), f'{len(pixel_values)=}, {sum(num_patches_list)=}, {len(num_patches_list)}, {question=}' | |
| image_input = pixel_values is not None | |
| if pixel_values is None: | |
| pixel_values = torch.zeros(1, 3, self.config.vision_config.image_size, self.config.vision_config.image_size).to(self.device).to(torch.bfloat16) | |
| img_context_token_id = tokenizer.convert_tokens_to_ids(IMG_CONTEXT_TOKEN) | |
| self.img_context_token_id = img_context_token_id | |
| candidate_tokens = [] | |
| candidate_weights = [] | |
| if isinstance(response, str): | |
| steps = self.split_response(response, max_steps=max_steps) | |
| else: | |
| steps = response | |
| # Prepare Query | |
| for k, v in str2score.items(): | |
| k_id = tokenizer.convert_tokens_to_ids(k) | |
| assert k_id != tokenizer.unk_token_id | |
| candidate_tokens.append(k_id) | |
| candidate_weights.append(v) | |
| template = get_conv_template(self.template) | |
| template.system_message = self.system_message | |
| for step_idx, step in enumerate(steps): | |
| if step_idx == 0: | |
| step = f'### Question:\n{question}\n\n### Solution Process:\n{step}' | |
| template.append_message(template.roles[0], step) | |
| template.append_message(template.roles[1], PLACEHOLDER) | |
| query = template.get_prompt() | |
| for num_patches in num_patches_list: | |
| image_tokens = IMG_START_TOKEN + IMG_CONTEXT_TOKEN * self.num_image_token * num_patches + IMG_END_TOKEN | |
| query = query.replace('<image>', image_tokens, 1) | |
| # Prepare inputs | |
| model_inputs = tokenizer(query, return_tensors='pt') | |
| # device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| device = self.device | |
| input_ids = model_inputs['input_ids'].to(device) | |
| attention_mask = model_inputs['attention_mask'].to(device) | |
| image_flags = torch.tensor([image_input] * pixel_values.size(0), dtype=torch.long).to(device) | |
| # Forward | |
| idx = self.find_placeholder_idx(tokenizer, input_ids, PLACEHOLDER=PLACEHOLDER) | |
| logits = self( | |
| pixel_values=pixel_values, | |
| input_ids=input_ids, | |
| attention_mask=attention_mask, | |
| image_flags=image_flags, | |
| ).logits | |
| logits = logits[0][idx, :][:, candidate_tokens] | |
| soft_scores = logits.softmax(dim=-1).tolist() | |
| assert len(soft_scores) == len(steps) | |
| # Gather step scores | |
| steps_with_score = [] | |
| for soft_score, step in zip(soft_scores, steps): | |
| score = 0 | |
| for s, w in zip(soft_score, candidate_weights): | |
| score += s * w | |
| steps_with_score.append({'step': step, 'score': score}) | |
| return steps_with_score | |
| def generate_overall_score(self, steps_with_score, func=sum): | |
| overall_score = [] | |
| for step in steps_with_score: | |
| curr_score = step['score'] | |
| overall_score.append(curr_score) | |
| return func(overall_score) | |
| def select_best_response( | |
| self, | |
| tokenizer, | |
| question, | |
| response_list, | |
| pixel_values=None, | |
| num_patches_list=None, | |
| max_steps=12, | |
| gather_func=None, | |
| str2score=None, | |
| return_scores=False, | |
| ): | |
| if gather_func is None: | |
| gather_func = lambda x:sum(x)/len(x) | |
| sorted_response_list = [] | |
| for response in response_list: | |
| steps_with_score = self.generate_steps_with_soft_score( | |
| tokenizer=tokenizer, | |
| question=question, | |
| response=response, | |
| pixel_values=pixel_values, | |
| num_patches_list=num_patches_list, | |
| max_steps=max_steps, | |
| str2score=str2score, | |
| ) | |
| overall_score = self.generate_overall_score(steps_with_score, func=gather_func) | |
| sorted_response_list.append((response, overall_score)) | |
| sorted_response_list = sorted(sorted_response_list, key=lambda x:x[1], reverse=True) | |
| if return_scores: | |
| return sorted_response_list | |
| return [item[0] for item in sorted_response_list] | |
| def check_correctness( | |
| self, | |
| tokenizer, | |
| question, | |
| response_list, | |
| pixel_values, | |
| num_patches_list=None, | |
| max_steps=12, | |
| threshold=0.8, | |
| str2score=None, | |
| ): | |
| correctness_list = [] | |
| for response in response_list: | |
| steps_with_score = self.generate_steps_with_soft_score( | |
| tokenizer=tokenizer, | |
| question=question, | |
| response=response, | |
| pixel_values=pixel_values, | |
| num_patches_list=num_patches_list, | |
| max_steps=max_steps, | |
| str2score=str2score, | |
| ) | |
| correctness = [1 if step_with_score['score'] > threshold else -1 for step_with_score in steps_with_score] | |
| correctness_list.append(correctness) | |
| return correctness_list | |