Source code for paddlenlp.metrics.squad

# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. team.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import re
import string
import json
import numpy as np

[docs]def compute_prediction(examples, features, predictions, version_2_with_negative: bool=False, n_best_size: int=20, max_answer_length: int=30, null_score_diff_threshold: float=0.0): """ Post-processes the predictions of a question-answering model to convert them to answers that are substrings of the original contexts. This is the base postprocessing functions for models that only return start and end logits. Args: examples: The non-preprocessed dataset (see the main script for more information). features: The processed dataset (see the main script for more information). predictions (:obj:`Tuple[np.ndarray, np.ndarray]`): The predictions of the model: two arrays containing the start logits and the end logits respectively. Its first dimension must match the number of elements of :obj:`features`. version_2_with_negative (:obj:`bool`, `optional`, defaults to :obj:`False`): Whether or not the underlying dataset contains examples with no answers. n_best_size (:obj:`int`, `optional`, defaults to 20): The total number of n-best predictions to generate when looking for an answer. max_answer_length (:obj:`int`, `optional`, defaults to 30): The maximum length of an answer that can be generated. This is needed because the start and end predictions are not conditioned on one another. null_score_diff_threshold (:obj:`float`, `optional`, defaults to 0): The threshold used to select the null answer: if the best answer has a score that is less than the score of the null answer minus this threshold, the null answer is selected for this example (note that the score of the null answer for an example giving several features is the minimum of the scores for the null answer on each feature: all features must be aligned on the fact they `want` to predict a null answer). Only useful when :obj:`version_2_with_negative` is :obj:`True`. """ assert len( predictions ) == 2, "`predictions` should be a tuple with two elements (start_logits, end_logits)." all_start_logits, all_end_logits = predictions assert len(predictions[0]) == len( features), "Number of predictions should be equal to number of features." # Build a map example to its corresponding features. features_per_example = collections.defaultdict(list) for i, feature in enumerate(features): features_per_example[feature["example_id"]].append(i) # The dictionaries we have to fill. all_predictions = collections.OrderedDict() all_nbest_json = collections.OrderedDict() scores_diff_json = collections.OrderedDict() # Let's loop over all the examples! for example_index, example in enumerate(examples): # Those are the indices of the features associated to the current example. feature_indices = features_per_example[example['id']] min_null_prediction = None prelim_predictions = [] # Looping through all the features associated to the current example. for feature_index in feature_indices: # We grab the predictions of the model for this feature. start_logits = all_start_logits[feature_index] end_logits = all_end_logits[feature_index] # This is what will allow us to map some the positions in our logits to span of texts in the original # context. offset_mapping = features[feature_index]["offset_mapping"] # Optional `token_is_max_context`, if provided we will remove answers that do not have the maximum context # available in the current feature. token_is_max_context = features[feature_index].get( "token_is_max_context", None) # Update minimum null prediction. feature_null_score = start_logits[0] + end_logits[0] if min_null_prediction is None or min_null_prediction[ "score"] > feature_null_score: min_null_prediction = { "offsets": (0, 0), "score": feature_null_score, "start_logit": start_logits[0], "end_logit": end_logits[0], } # Go through all possibilities for the `n_best_size` greater start and end logits. start_indexes = np.argsort(start_logits)[-1:-n_best_size - 1: -1].tolist() end_indexes = np.argsort(end_logits)[-1:-n_best_size - 1:-1].tolist( ) for start_index in start_indexes: for end_index in end_indexes: # Don't consider out-of-scope answers, either because the indices are out of bounds or correspond # to part of the input_ids that are not in the context. if (start_index >= len(offset_mapping) or end_index >= len(offset_mapping) or offset_mapping[start_index] is None or offset_mapping[end_index] is None or offset_mapping[start_index] == (0, 0) or offset_mapping[end_index] == (0, 0)): continue # Don't consider answers with a length that is either < 0 or > max_answer_length. if end_index < start_index or end_index - start_index + 1 > max_answer_length: continue # Don't consider answer that don't have the maximum context available (if such information is # provided). if token_is_max_context is not None and not token_is_max_context.get( str(start_index), False): continue prelim_predictions.append({ "offsets": (offset_mapping[start_index][0], offset_mapping[end_index][1]), "score": start_logits[start_index] + end_logits[end_index], "start_logit": start_logits[start_index], "end_logit": end_logits[end_index], }) if version_2_with_negative: # Add the minimum null prediction prelim_predictions.append(min_null_prediction) null_score = min_null_prediction["score"] # Only keep the best `n_best_size` predictions. predictions = sorted( prelim_predictions, key=lambda x: x["score"], reverse=True)[:n_best_size] # Add back the minimum null prediction if it was removed because of its low score. if version_2_with_negative and not any(p["offsets"] == (0, 0) for p in predictions): predictions.append(min_null_prediction) # Use the offsets to gather the answer text in the original context. context = example["context"] for pred in predictions: offsets = pred.pop("offsets") pred["text"] = context[offsets[0]:offsets[1]] # In the very rare edge case we have not a single non-null prediction, we create a fake prediction to avoid # failure. if len(predictions) == 0 or (len(predictions) == 1 and predictions[0]["text"] == ""): predictions.insert(0, { "text": "empty", "start_logit": 0.0, "end_logit": 0.0, "score": 0.0 }) # Compute the softmax of all scores (we do it with numpy to stay independent from torch/tf in this file, using # the LogSumExp trick). scores = np.array([pred.pop("score") for pred in predictions]) exp_scores = np.exp(scores - np.max(scores)) probs = exp_scores / exp_scores.sum() # Include the probabilities in our predictions. for prob, pred in zip(probs, predictions): pred["probability"] = prob # Pick the best prediction. If the null answer is not possible, this is easy. if not version_2_with_negative: all_predictions[example["id"]] = predictions[0]["text"] else: # Otherwise we first need to find the best non-empty prediction. i = 0 while predictions[i]["text"] == "": i += 1 best_non_null_pred = predictions[i] # Then we compare to the null prediction using the threshold. score_diff = null_score - best_non_null_pred[ "start_logit"] - best_non_null_pred["end_logit"] scores_diff_json[example["id"]] = float( score_diff) # To be JSON-serializable. if score_diff > null_score_diff_threshold: all_predictions[example["id"]] = "" else: all_predictions[example["id"]] = best_non_null_pred["text"] # Make `predictions` JSON-serializable by casting np.float back to float. all_nbest_json[example["id"]] = [{ k: (float(v) if isinstance(v, (np.float16, np.float32, np.float64)) else v) for k, v in pred.items() } for pred in predictions] return all_predictions, all_nbest_json, scores_diff_json
def make_qid_to_has_ans(examples): qid_to_has_ans = {} for example in examples: qid_to_has_ans[example['id']] = not example.get('is_impossible', False) return qid_to_has_ans
[docs]def normalize_answer(s): """Lower text and remove punctuation, articles and extra whitespace.""" def remove_articles(text): regex = re.compile(r'\b(a|an|the)\b', re.UNICODE) return re.sub(regex, ' ', text) def white_space_fix(text): return ' '.join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return ''.join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() if not s: return '' else: return white_space_fix(remove_articles(remove_punc(lower(s))))
def compute_exact(a_gold, a_pred): return int(normalize_answer(a_gold) == normalize_answer(a_pred)) def compute_f1(a_gold, a_pred, is_whitespace_splited=True): gold_toks = normalize_answer(a_gold).split() pred_toks = normalize_answer(a_pred).split() if not is_whitespace_splited: gold_toks = gold_toks[0] if gold_toks else "" pred_toks = pred_toks[0] if pred_toks else "" common = collections.Counter(gold_toks) & collections.Counter(pred_toks) num_same = sum(common.values()) if len(gold_toks) == 0 or len(pred_toks) == 0: # If either is no-answer, then F1 is 1 if they agree, 0 otherwise return int(gold_toks == pred_toks) if num_same == 0: return 0 precision = 1.0 * num_same / len(pred_toks) recall = 1.0 * num_same / len(gold_toks) f1 = (2 * precision * recall) / (precision + recall) return f1 def get_raw_scores(examples, preds, is_whitespace_splited=True): exact_scores = {} f1_scores = {} for example in examples: qid = example['id'] gold_answers = [ text for text in example['answers'] if normalize_answer(text) ] if not gold_answers: # For unanswerable questions, only correct answer is empty string gold_answers = [''] if qid not in preds: print('Missing prediction for %s' % qid) continue a_pred = preds[qid] # Take max over all gold answers exact_scores[qid] = max(compute_exact(a, a_pred) for a in gold_answers) f1_scores[qid] = max( compute_f1(a, a_pred, is_whitespace_splited) for a in gold_answers) return exact_scores, f1_scores def apply_no_ans_threshold(scores, na_probs, qid_to_has_ans, na_prob_thresh): new_scores = {} for qid, s in scores.items(): pred_na = na_probs[qid] > na_prob_thresh if pred_na: new_scores[qid] = float(not qid_to_has_ans[qid]) else: new_scores[qid] = s return new_scores def make_eval_dict(exact_scores, f1_scores, qid_list=None): if not qid_list: total = len(exact_scores) return collections.OrderedDict([ ('exact', 100.0 * sum(exact_scores.values()) / total), ('f1', 100.0 * sum(f1_scores.values()) / total), ('total', total), ]) else: total = len(qid_list) return collections.OrderedDict([ ('exact', 100.0 * sum(exact_scores[k] for k in qid_list) / total), ('f1', 100.0 * sum(f1_scores[k] for k in qid_list) / total), ('total', total), ]) def merge_eval(main_eval, new_eval, prefix): for k in new_eval: main_eval['%s_%s' % (prefix, k)] = new_eval[k] def find_best_thresh(preds, scores, na_probs, qid_to_has_ans): num_no_ans = sum(1 for k in qid_to_has_ans if not qid_to_has_ans[k]) cur_score = num_no_ans best_score = cur_score best_thresh = 0.0 qid_list = sorted(na_probs, key=lambda k: na_probs[k]) for i, qid in enumerate(qid_list): if qid not in scores: continue if qid_to_has_ans[qid]: diff = scores[qid] else: if preds[qid]: diff = -1 else: diff = 0 cur_score += diff if cur_score > best_score: best_score = cur_score best_thresh = na_probs[qid] return 100.0 * best_score / len(scores), best_thresh def find_all_best_thresh(main_eval, preds, exact_raw, f1_raw, na_probs, qid_to_has_ans): best_exact, exact_thresh = find_best_thresh(preds, exact_raw, na_probs, qid_to_has_ans) best_f1, f1_thresh = find_best_thresh(preds, f1_raw, na_probs, qid_to_has_ans) main_eval['best_exact'] = best_exact main_eval['best_exact_thresh'] = exact_thresh main_eval['best_f1'] = best_f1 main_eval['best_f1_thresh'] = f1_thresh def squad_evaluate(examples, preds, na_probs=None, na_prob_thresh=1.0, is_whitespace_splited=True): if not na_probs: na_probs = {k: 0.0 for k in preds} qid_to_has_ans = make_qid_to_has_ans(examples) # maps qid to True/False has_ans_qids = [k for k, v in qid_to_has_ans.items() if v] no_ans_qids = [k for k, v in qid_to_has_ans.items() if not v] exact_raw, f1_raw = get_raw_scores(examples, preds, is_whitespace_splited) exact_thresh = apply_no_ans_threshold(exact_raw, na_probs, qid_to_has_ans, na_prob_thresh) f1_thresh = apply_no_ans_threshold(f1_raw, na_probs, qid_to_has_ans, na_prob_thresh) out_eval = make_eval_dict(exact_thresh, f1_thresh) if has_ans_qids: has_ans_eval = make_eval_dict( exact_thresh, f1_thresh, qid_list=has_ans_qids) merge_eval(out_eval, has_ans_eval, 'HasAns') if no_ans_qids: no_ans_eval = make_eval_dict( exact_thresh, f1_thresh, qid_list=no_ans_qids) merge_eval(out_eval, no_ans_eval, 'NoAns') find_all_best_thresh(out_eval, preds, exact_raw, f1_raw, na_probs, qid_to_has_ans) print(json.dumps(out_eval, indent=2))