import torch from flash_attn.flash_attention import FlashAttention from transformers.models.t5.modeling_t5 import T5Attention class T5FlashAttention(T5Attention): def __init__(self, config, has_relative_attention_bias=False): super().__init__(config, has_relative_attention_bias) self.flash_attn = FlashAttention( attention_dropout=self.dropout ) def compute_flash_attention(self, qkv): ctx, _ = self.flash_attn(qkv) return ctx def forward( self, hidden_states, mask=None, key_value_states=None, position_bias=None, past_key_value=None, layer_head_mask=None, query_length=None, use_cache=False, output_attentions=False, ): """ Self-attention (if key_value_states is None) or attention over source sentence (provided by key_value_states). """ # Input is (batch_size, seq_length, dim) # Mask is (batch_size, key_length) (non-causal) or (batch_size, key_length, key_length) # past_key_value[0] is (batch_size, n_heads, q_len - 1, dim_per_head) batch_size, seq_length = hidden_states.shape[:2] real_seq_length = seq_length if past_key_value is not None: assert ( len(past_key_value) == 2 ), f"past_key_value should have 2 past states: keys and values. Got { len(past_key_value)} past states" real_seq_length += past_key_value[0].shape[2] if query_length is None else query_length key_length = real_seq_length if key_value_states is None else key_value_states.shape[1] def shape(states): """projection""" return states.view(batch_size, -1, self.n_heads, self.key_value_proj_dim) def unshape(states): """reshape""" return states.contiguous().view(batch_size, -1, self.inner_dim) def project(hidden_states, proj_layer, key_value_states, past_key_value): """projects hidden states correctly to key/query states""" if key_value_states is None: # self-attn # (batch_size, n_heads, seq_length, dim_per_head) hidden_states = shape(proj_layer(hidden_states)) elif past_key_value is None: # cross-attn # (batch_size, n_heads, seq_length, dim_per_head) hidden_states = shape(proj_layer(key_value_states)) if past_key_value is not None: if key_value_states is None: # self-attn # (batch_size, n_heads, key_length, dim_per_head) hidden_states = torch.cat([past_key_value, hidden_states], dim=2) elif past_key_value.shape[2] != key_value_states.shape[1]: # checking that the `sequence_length` of the `past_key_value` is the same as # the provided `key_value_states` to support prefix tuning # cross-attn # (batch_size, n_heads, seq_length, dim_per_head) hidden_states = shape(proj_layer(key_value_states)) else: # cross-attn hidden_states = past_key_value return hidden_states # get query states query_states = shape(self.q(hidden_states)) # (batch_size, seq_length, n_heads, dim_per_head) # get key/value states key_states = project( hidden_states, self.k, key_value_states, past_key_value[0] if past_key_value is not None else None ) value_states = project( hidden_states, self.v, key_value_states, past_key_value[1] if past_key_value is not None else None ) qkv = torch.stack([query_states, key_states, value_states], dim=2) attn_output = unshape(self.compute_flash_attention(qkv)) # (batch_size, seq_length, dim) attn_output = self.o(attn_output) present_key_value_state = (key_states, value_states) if (self.is_decoder and use_cache) else None outputs = (attn_output,) + (present_key_value_state,) + (position_bias,) if output_attentions: outputs = outputs + (attn_weights,) return outputs def add_flash_attn(model, model_config): for i, c in enumerate(model.encoder.block.children()): c.layer[0].SelfAttention = T5FlashAttention(model_config, has_relative_attention_bias=bool(i == 0)) for i, c in enumerate(model.decoder.block.children()): c.layer[0].SelfAttention = T5FlashAttention(model_config, has_relative_attention_bias=bool(i == 0)) c.layer[1].EncDecAttention = T5FlashAttention(model_config, has_relative_attention_bias=bool(i == 0))