# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
from typing import Optional, Tuple
from collections import OrderedDict
import paddle
import paddle.nn as nn
import paddle.tensor as tensor
import paddle.nn.functional as F
from .. import PretrainedModel, register_base_model
__all__ = [
'ElectraModel', 'ElectraPretrainedModel', 'ElectraForTotalPretraining',
'ElectraDiscriminator', 'ElectraGenerator', 'ElectraClassificationHead',
'ElectraForSequenceClassification', 'ElectraForTokenClassification',
'ElectraPretrainingCriterion'
]
def get_activation(activation_string):
if activation_string in ACT2FN:
return ACT2FN[activation_string]
else:
raise KeyError("function {} not found in ACT2FN mapping {}".format(
activation_string, list(ACT2FN.keys())))
def mish(x):
return x * F.tanh(F.softplus(x))
def linear_act(x):
return x
def swish(x):
return x * F.sigmoid(x)
ACT2FN = {
"relu": F.relu,
"gelu": F.gelu,
"tanh": F.tanh,
"sigmoid": F.sigmoid,
"mish": mish,
"linear": linear_act,
"swish": swish,
}
class ElectraEmbeddings(nn.Layer):
"""Construct the embeddings from word, position and token_type embeddings."""
def __init__(self, vocab_size, embedding_size, hidden_dropout_prob,
max_position_embeddings, type_vocab_size):
super(ElectraEmbeddings, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, embedding_size)
self.position_embeddings = nn.Embedding(max_position_embeddings,
embedding_size)
self.token_type_embeddings = nn.Embedding(type_vocab_size,
embedding_size)
self.layer_norm = nn.LayerNorm(embedding_size, epsilon=1e-12)
self.dropout = nn.Dropout(hidden_dropout_prob)
def forward(self, input_ids, token_type_ids=None, position_ids=None):
if position_ids is None:
ones = paddle.ones_like(input_ids, dtype="int64")
seq_length = paddle.cumsum(ones, axis=-1)
position_ids = seq_length - ones
position_ids.stop_gradient = True
if token_type_ids is None:
token_type_ids = paddle.zeros_like(input_ids, dtype="int64")
input_embeddings = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings = input_embeddings + position_embeddings + token_type_embeddings
embeddings = self.layer_norm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
class ElectraDiscriminatorPredictions(nn.Layer):
"""Prediction layer for the discriminator, made up of two dense layers."""
def __init__(self, hidden_size, hidden_act):
super(ElectraDiscriminatorPredictions, self).__init__()
self.dense = nn.Linear(hidden_size, hidden_size)
self.dense_prediction = nn.Linear(hidden_size, 1)
self.act = get_activation(hidden_act)
def forward(self, discriminator_hidden_states):
hidden_states = self.dense(discriminator_hidden_states)
hidden_states = self.act(hidden_states)
logits = self.dense_prediction(hidden_states).squeeze()
return logits
class ElectraGeneratorPredictions(nn.Layer):
"""Prediction layer for the generator, made up of two dense layers."""
def __init__(self, embedding_size, hidden_size, hidden_act):
super(ElectraGeneratorPredictions, self).__init__()
self.layer_norm = nn.LayerNorm(embedding_size)
self.dense = nn.Linear(hidden_size, embedding_size)
self.act = get_activation(hidden_act)
def forward(self, generator_hidden_states):
hidden_states = self.dense(generator_hidden_states)
hidden_states = self.act(hidden_states)
hidden_states = self.layer_norm(hidden_states)
return hidden_states
[docs]class ElectraPretrainedModel(PretrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
base_model_prefix = "electra"
model_config_file = "model_config.json"
# pretrained general configuration
gen_weight = 1.0
disc_weight = 50.0
tie_word_embeddings = True
untied_generator_embeddings = False
use_softmax_sample = True
# model init configuration
pretrained_init_configuration = {
"electra-small": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 128,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-base": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 768,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 768,
"initializer_range": 0.02,
"intermediate_size": 3072,
"max_position_embeddings": 512,
"num_attention_heads": 12,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-large": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 1024,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 1024,
"initializer_range": 0.02,
"intermediate_size": 4096,
"max_position_embeddings": 512,
"num_attention_heads": 16,
"num_hidden_layers": 24,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"chinese-electra-small": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 128,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 21128,
},
"chinese-electra-base": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 768,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 768,
"initializer_range": 0.02,
"intermediate_size": 3072,
"max_position_embeddings": 512,
"num_attention_heads": 12,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 21128,
},
}
resource_files_names = {"model_state": "model_state.pdparams"}
pretrained_resource_files_map = {
"model_state": {
"electra-small":
"http://paddlenlp.bj.bcebos.com/models/transformers/electra/electra-small.pdparams",
"electra-base":
"http://paddlenlp.bj.bcebos.com/models/transformers/electra/electra-base.pdparams",
"electra-large":
"http://paddlenlp.bj.bcebos.com/models/transformers/electra/electra-large.pdparams",
"chinese-electra-small":
"http://paddlenlp.bj.bcebos.com/models/transformers/chinese-electra-small/chinese-electra-small.pdparams",
"chinese-electra-base":
"http://paddlenlp.bj.bcebos.com/models/transformers/chinese-electra-base/chinese-electra-base.pdparams",
}
}
[docs] def init_weights(self):
"""
Initializes and tie weights if needed.
"""
# Initialize weights
self.apply(self._init_weights)
# Tie weights if needed
self.tie_weights()
[docs] def tie_weights(self):
"""
Tie the weights between the input embeddings and the output embeddings.
"""
if hasattr(self, "get_output_embeddings") and hasattr(
self, "get_input_embeddings"):
output_embeddings = self.get_output_embeddings()
if output_embeddings is not None:
self._tie_or_clone_weights(output_embeddings,
self.get_input_embeddings())
def _init_weights(self, layer):
""" Initialize the weights """
if isinstance(layer, (nn.Linear, nn.Embedding)):
layer.weight.set_value(
paddle.tensor.normal(
mean=0.0,
std=self.initializer_range
if hasattr(self, "initializer_range") else
self.electra.config["initializer_range"],
shape=layer.weight.shape))
elif isinstance(layer, nn.LayerNorm):
layer.bias.set_value(paddle.zeros_like(layer.bias))
layer.weight.set_value(paddle.full_like(layer.weight, 1.0))
layer._epsilon = 1e-12
if isinstance(layer, nn.Linear) and layer.bias is not None:
layer.bias.set_value(paddle.zeros_like(layer.bias))
def _tie_or_clone_weights(self, output_embeddings, input_embeddings):
"""Tie or clone layer weights"""
if output_embeddings.weight.shape == input_embeddings.weight.shape:
output_embeddings.weight = input_embeddings.weight
elif output_embeddings.weight.shape == input_embeddings.weight.t(
).shape:
output_embeddings.weight.set_value(input_embeddings.weight.t())
else:
raise ValueError(
"when tie input/output embeddings, the shape of output embeddings: {}"
"should be equal to shape of input embeddings: {}"
"or should be equal to the shape of transpose input embeddings: {}".
format(output_embeddings.weight.shape, input_embeddings.weight.
shape, input_embeddings.weight.t().shape))
if getattr(output_embeddings, "bias", None) is not None:
if output_embeddings.weight.shape[
-1] != output_embeddings.bias.shape[0]:
raise ValueError(
"the weight lase shape: {} of output_embeddings is not equal to the bias shape: {}"
"please check output_embeddings configuration".format(
output_embeddings.weight.shape[
-1], output_embeddings.bias.shape[0]))
[docs]@register_base_model
class ElectraModel(ElectraPretrainedModel):
def __init__(self, vocab_size, embedding_size, hidden_size,
num_hidden_layers, num_attention_heads, intermediate_size,
hidden_act, hidden_dropout_prob, attention_probs_dropout_prob,
max_position_embeddings, type_vocab_size, initializer_range,
pad_token_id):
super(ElectraModel, self).__init__()
self.pad_token_id = pad_token_id
self.initializer_range = initializer_range
self.embeddings = ElectraEmbeddings(
vocab_size, embedding_size, hidden_dropout_prob,
max_position_embeddings, type_vocab_size)
if embedding_size != hidden_size:
self.embeddings_project = nn.Linear(embedding_size, hidden_size)
encoder_layer = nn.TransformerEncoderLayer(
hidden_size,
num_attention_heads,
intermediate_size,
dropout=hidden_dropout_prob,
activation=hidden_act,
attn_dropout=attention_probs_dropout_prob,
act_dropout=0)
self.encoder = nn.TransformerEncoder(encoder_layer, num_hidden_layers)
self.init_weights()
def get_input_embeddings(self):
return self.embeddings.word_embeddings
def set_input_embeddings(self, value):
self.embeddings.word_embeddings = value
[docs] def forward(self,
input_ids,
token_type_ids=None,
position_ids=None,
attention_mask=None):
if attention_mask is None:
attention_mask = paddle.unsqueeze(
(input_ids == self.pad_token_id
).astype(paddle.get_default_dtype()) * -1e9,
axis=[1, 2])
embedding_output = self.embeddings(
input_ids=input_ids,
position_ids=position_ids,
token_type_ids=token_type_ids)
if hasattr(self, "embeddings_project"):
embedding_output = self.embeddings_project(embedding_output)
encoder_outputs = self.encoder(embedding_output, attention_mask)
return encoder_outputs
[docs]class ElectraDiscriminator(ElectraPretrainedModel):
def __init__(self, electra):
super(ElectraDiscriminator, self).__init__()
self.electra = electra
self.discriminator_predictions = ElectraDiscriminatorPredictions(
self.electra.config["hidden_size"],
self.electra.config["hidden_act"])
self.init_weights()
[docs] def forward(self,
input_ids,
token_type_ids=None,
position_ids=None,
attention_mask=None):
discriminator_sequence_output = self.electra(
input_ids, token_type_ids, position_ids, attention_mask)
logits = self.discriminator_predictions(discriminator_sequence_output)
return logits
[docs]class ElectraGenerator(ElectraPretrainedModel):
def __init__(self, electra):
super(ElectraGenerator, self).__init__()
self.electra = electra
self.generator_predictions = ElectraGeneratorPredictions(
self.electra.config["embedding_size"],
self.electra.config["hidden_size"],
self.electra.config["hidden_act"])
if not self.tie_word_embeddings:
self.generator_lm_head = nn.Linear(
self.electra.config["embedding_size"],
self.electra.config["vocab_size"])
else:
self.generator_lm_head_bias = paddle.fluid.layers.create_parameter(
shape=[self.electra.config["vocab_size"]],
dtype=paddle.get_default_dtype(),
is_bias=True)
self.init_weights()
def get_input_embeddings(self):
return self.electra.embeddings.word_embeddings
[docs] def forward(self,
input_ids=None,
token_type_ids=None,
position_ids=None,
attention_mask=None):
generator_sequence_output = self.electra(input_ids, token_type_ids,
position_ids, attention_mask)
prediction_scores = self.generator_predictions(
generator_sequence_output)
if not self.tie_word_embeddings:
prediction_scores = self.generator_lm_head(prediction_scores)
else:
prediction_scores = paddle.add(paddle.matmul(
prediction_scores,
self.get_input_embeddings().weight,
transpose_y=True),
self.generator_lm_head_bias)
return prediction_scores
# class ElectraClassificationHead and ElectraForSequenceClassification for fine-tuning
[docs]class ElectraClassificationHead(nn.Layer):
"""Head for sentence-level classification tasks."""
def __init__(self, hidden_size, hidden_dropout_prob, num_classes):
super(ElectraClassificationHead, self).__init__()
self.dense = nn.Linear(hidden_size, hidden_size)
self.dropout = nn.Dropout(hidden_dropout_prob)
self.out_proj = nn.Linear(hidden_size, num_classes)
self.act = get_activation("gelu")
[docs] def forward(self, features, **kwargs):
x = features[:, 0, :] # take <s> token (equiv. to [CLS])
x = self.dropout(x)
x = self.dense(x)
x = self.act(x) # Electra paper used gelu here
x = self.dropout(x)
x = self.out_proj(x)
return x
[docs]class ElectraForSequenceClassification(ElectraPretrainedModel):
def __init__(self, electra, num_classes):
super(ElectraForSequenceClassification, self).__init__()
self.num_classes = num_classes
self.electra = electra
self.classifier = ElectraClassificationHead(
self.electra.config["hidden_size"],
self.electra.config["hidden_dropout_prob"], self.num_classes)
self.init_weights()
[docs] def forward(self,
input_ids=None,
token_type_ids=None,
position_ids=None,
attention_mask=None):
sequence_output = self.electra(input_ids, token_type_ids, position_ids,
attention_mask)
logits = self.classifier(sequence_output)
return logits
[docs]class ElectraForTokenClassification(ElectraPretrainedModel):
def __init__(self, electra, num_classes):
super(ElectraForTokenClassification, self).__init__()
self.num_classes = num_classes
self.electra = electra
self.dropout = nn.Dropout(self.electra.config["hidden_dropout_prob"])
self.classifier = nn.Linear(self.electra.config["hidden_size"],
self.num_classes)
self.init_weights()
[docs] def forward(self,
input_ids=None,
token_type_ids=None,
position_ids=None,
attention_mask=None):
sequence_output = self.electra(input_ids, token_type_ids, position_ids,
attention_mask)
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
return logits
[docs]class ElectraForTotalPretraining(ElectraPretrainedModel):
pretrained_init_configuration = {
"electra-small-generator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 128,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-base-generator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 768,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-large-generator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 1024,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 24,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-small-discriminator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 128,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 256,
"initializer_range": 0.02,
"intermediate_size": 1024,
"max_position_embeddings": 512,
"num_attention_heads": 4,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-base-discriminator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 768,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 768,
"initializer_range": 0.02,
"intermediate_size": 3072,
"max_position_embeddings": 512,
"num_attention_heads": 12,
"num_hidden_layers": 12,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
"electra-large-discriminator": {
"attention_probs_dropout_prob": 0.1,
"embedding_size": 1024,
"hidden_act": "gelu",
"hidden_dropout_prob": 0.1,
"hidden_size": 1024,
"initializer_range": 0.02,
"intermediate_size": 4096,
"max_position_embeddings": 512,
"num_attention_heads": 16,
"num_hidden_layers": 24,
"pad_token_id": 0,
"type_vocab_size": 2,
"vocab_size": 30522
},
}
def __init__(self, generator, discriminator):
super(ElectraForTotalPretraining, self).__init__()
self.generator = generator
self.discriminator = discriminator
self.initializer_range = discriminator.electra.initializer_range
self.init_weights()
def get_input_embeddings(self):
if not self.untied_generator_embeddings:
return self.generator.electra.embeddings.word_embeddings
else:
return None
def get_output_embeddings(self):
if not self.untied_generator_embeddings:
return self.discriminator.electra.embeddings.word_embeddings
else:
return None
def sample_from_softmax(self, logits, use_softmax_sample=True):
if use_softmax_sample:
#uniform_noise = paddle.uniform(logits.shape, dtype="float32", min=0, max=1)
uniform_noise = paddle.rand(
logits.shape, dtype=paddle.get_default_dtype())
gumbel_noise = -paddle.log(-paddle.log(uniform_noise + 1e-9) + 1e-9)
else:
gumbel_noise = paddle.zeros_like(logits)
# softmax_sample equal to sampled_tokids.unsqueeze(-1)
softmax_sample = paddle.argmax(
F.softmax(logits + gumbel_noise), axis=-1)
# one hot
return F.one_hot(softmax_sample, logits.shape[-1])
def update_inputs(self, sequence, updates, positions):
shape = sequence.shape
assert (len(shape) == 2), "the dimension of inputs should be [B, L]"
B, L = shape
N = positions.shape[1]
assert (
N == L), "the dimension of inputs and mask should be same as [B, L]"
updated_sequence = ((
(paddle.ones_like(sequence) - positions) * sequence) +
(positions * updates))
return updated_sequence
[docs] def forward(self,
input_ids=None,
token_type_ids=None,
position_ids=None,
attention_mask=None,
raw_input_ids=None,
gen_labels=None):
assert (
gen_labels is not None
), "gen_labels should not be None, please check DataCollatorForLanguageModeling"
gen_logits = self.generator(input_ids, token_type_ids, position_ids,
attention_mask)
disc_inputs, disc_labels, generator_predict_tokens = self.get_discriminator_inputs(
input_ids, raw_input_ids, gen_logits, gen_labels,
self.use_softmax_sample)
disc_logits = self.discriminator(disc_inputs, token_type_ids,
position_ids, attention_mask)
if attention_mask is None:
attention_mask = (
input_ids != self.discriminator.electra.config["pad_token_id"])
else:
attention_mask = attention_mask.astype("bool")
return gen_logits, disc_logits, disc_labels, attention_mask
[docs]class ElectraPretrainingCriterion(paddle.nn.Layer):
def __init__(self, vocab_size, gen_weight, disc_weight):
super(ElectraPretrainingCriterion, self).__init__()
self.vocab_size = vocab_size
self.gen_weight = gen_weight
self.disc_weight = disc_weight
self.gen_loss_fct = nn.CrossEntropyLoss(reduction='none')
self.disc_loss_fct = nn.BCEWithLogitsLoss(reduction='none')
[docs] def forward(self, generator_prediction_scores,
discriminator_prediction_scores, generator_labels,
discriminator_labels, attention_mask):
# generator loss
gen_loss = self.gen_loss_fct(
paddle.reshape(generator_prediction_scores, [-1, self.vocab_size]),
paddle.reshape(generator_labels, [-1]))
# todo: we can remove 4 lines after when CrossEntropyLoss(reduction='mean') improved
umask_positions = paddle.zeros_like(generator_labels).astype(
paddle.get_default_dtype())
mask_positions = paddle.ones_like(generator_labels).astype(
paddle.get_default_dtype())
mask_positions = paddle.where(generator_labels == -100, umask_positions,
mask_positions)
if mask_positions.sum() == 0:
gen_loss = paddle.to_tensor([0.0])
else:
gen_loss = gen_loss.sum() / mask_positions.sum()
# discriminator loss
seq_length = discriminator_labels.shape[1]
disc_loss = self.disc_loss_fct(
paddle.reshape(discriminator_prediction_scores, [-1, seq_length]),
discriminator_labels.astype(paddle.get_default_dtype()))
if attention_mask is not None:
umask_positions = paddle.ones_like(discriminator_labels).astype(
paddle.get_default_dtype())
mask_positions = paddle.zeros_like(discriminator_labels).astype(
paddle.get_default_dtype())
use_disc_loss = paddle.where(attention_mask, disc_loss,
mask_positions)
umask_positions = paddle.where(attention_mask, umask_positions,
mask_positions)
disc_loss = use_disc_loss.sum() / umask_positions.sum()
else:
total_positions = paddle.ones_like(discriminator_labels).astype(
paddle.get_default_dtype())
disc_loss = disc_loss.sum() / total_positions.sum()
return self.gen_weight * gen_loss + self.disc_weight * disc_loss