Natural Language to SQL (NL2SQL)

TL;DR

  • The OpenAI API provides a high-quality resource for generating synthetic data for specialized use cases.

  • The Huggingface Transformers library offers useful tools and models for fine-tuning LLMs in conjunction with the PEFT library (Parameter-Efficient Fine-Tuning).

  • Long-T5 can generate customized SQL query responses efficiently without consuming many computational resources.

Most LLMs can handle a wide range of tasks. However, sometimes creating smaller models is useful, such as in resource constrained environments or scenarios that require a high amount of privacy. Luckily, finetuning smaller models is not that difficult.

In my use case I wanted to tune a LLM which takes in the context of the existing SQL database, such as tables, columns and foreign keys as well as a natural language query which queries or changes the content of the database and generates the corresponding SQL query.

Querying Database Information

Querying the context information is quite easy. This code achieves just that:

async function getTables() {
  try {
    // Connect to the database
    const client = await pool.connect();

    // Query to get all tables
    const res = await client.query(`SELECT 
    table_name AS "table", 
    string_agg(column_name || ': ' || data_type || ', Nullable: ' || is_nullable, '; ') AS "columns"
FROM 
    information_schema.columns
WHERE 
    table_schema = 'public'
GROUP BY 
    table_name
ORDER BY 
    table_name;

`);

    const foreignKeys = await client.query(`SELECT
    tc.table_name AS "table", 
    kcu.column_name AS "column", 
    ccu.table_name AS "ref_table",
    ccu.column_name AS "ref_column"
FROM 
    information_schema.table_constraints tc 
JOIN information_schema.key_column_usage kcu
    ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage ccu
    ON ccu.constraint_name = tc.constraint_name
WHERE 
    tc.constraint_type = 'FOREIGN KEY' AND 
    tc.table_schema = 'public';

`);

    const database = {
      tables: res.rows,
      foreignKeys: foreignKeys.rows,
    };

    const databaseString = JSON.stringify(database).replace(/\r?\n|\r/g, '');

    console.log(databaseString);
    // Release the client back to the pool
    client.release();
  } catch (err) {
    console.error(err);
  }
}

Generating Synthetic Data

Next, I needed some training data. The API offered by OpenAI is great at generating synthetic data.

def generate_data(api_key, system_prompt, initial_prompt, num_options):
    client = OpenAI(api_key=api_key)
    response = client.chat.completions.create(
        model="gpt-3.5-turbo-1106",
        messages=[{"role": "system", "content": system_prompt}, {"role": "user", "content": initial_prompt}],
        n=num_options,
        response_format={"type": "json_object"},
        max_tokens=4096 - len(initial_prompt),
    )
    return response.choices

This function can then be called in parallel to speed up the data generation

def generate_synthetic_data(api_key: str, initial_prompt: str, system_prompt: str, num_examples: int, num_options: int, output_file: str = 'generated_data') -> None:
    output_file += datetime.datetime.now().strftime("%Y-%m-%d-%H_%M_%S") + ".jsonl"

    with open("./generated_categories/" + output_file, "w") as file:
        prompt = {"system_prompt": system_prompt, "initial_prompt": initial_prompt}
        file.write(json.dumps(prompt) + '\n')

        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [executor.submit(generate_data, api_key, system_prompt, initial_prompt, num_options) for _ in range(num_examples)]
            for future in as_completed(futures):
                try:
                    choices = future.result(timeout=60)
                    for choice in choices:
                        content_json = json.loads(choice.message.content)
                        file.write(json.dumps(content_json) + '\n')
                except TimeoutError:
                    print("TimeoutError")
                except Exception as e:
                    print(f"Error: {e}")

Here I am using ten threads to generate data in parallel.

LLMs usually require detailed instructions to generate high quality output. In order to get as much output per input, I instructed the model to generate as many examples as possible


load_dotenv('.pyenv')
api_key = os.getenv('OPENAI_API_KEY')
system_prompt = "you are an assistant generating json for a synthetic dataset generating as many examples as you can"
initial_prompt = """
"Generate a JSON-formatted list of various objects, each with a set of attributes and their respective data types. Do not include specific instances or values, just the general structure. For example, for a 'fruit' object, list common attributes like 'color', 'edible', etc., and specify their data SQL data types lik character, text, integer, character varying, uuid, jsonb, timestamp with time zone, smallint, numeric, timestamp without time zone, boolean, bytea, date, ARRAY, USER-DEFINED, inet, interval. Format the response as JSON, similar to: {'fruit': {'color': 'character', 'edible': 'boolean', 'height': 'smallint'}}. Create similar structures for different objects, including attributes relevant to each category. Create fifteen examples"
"""
num_examples = 1000
num_options = 1
generate_synthetic_data(api_key, initial_prompt,
                        system_prompt, num_examples, num_options)

I repeated the process for the SQL queries by passing example queries such as the one below as a prompt:

{
    "natural_language_query": "Add a new animal to animal_99 and return its ID.",
    "sql_query": "INSERT INTO animal_99 (species, habitat) VALUES ($1, $2) RETURNING id;",
    "json_body": {
        "species": "",
        "habitat": "",
    },
    "sql_params": ["json_body.species", "json_body.habitat"]
},

I got best results when providing the model with concrete datatypes that I want the response object to have. Here I used Typescript, but likely all common languages that include a type system should work.

Create ten PostgreSQL SQL queries for CRUD operations on a database with JSON responses.
The queries should align with the provided JSON structure, using `this.<field>` in the natural language descriptions. Refer to the object as "this" in the NL-descriptions.
Include nested fields in the JSON body and at least one aggregation.
Your response must follow this TypeScript type:

{
  queries: {
    natural_language_query: string;
    sql_query: string;
    json_body?: Record<string, any>;
    sql_params?: `json_body.${string}`[];
  }[];
}

Below two of those generated responses can be seen:

"{""nl_query"": ""return all cars"", ""context"": {""tables"": [{""table"": ""car_750"", ""columns"": ""mileage: numeric; is_used: boolean; make: character varying; year: smallint; color: character varying; model: character varying""}], ""foreignKeys"": []}, ""json_obj"": {}}","{""sql_query"": ""SELECT * FROM car_750"", ""query_params"": []}"

"{""nl_query"": ""select planets with more than item.moons moons"", ""context"": {""tables"": [{""table"": ""planet_850"", ""columns"": ""distance_from_sun: numeric; number_of_moons: integer; diameter: numeric; discovered_date: timestamp with time zone; name: character varying""}], ""foreignKeys"": []}, ""json_obj"": {""moons"": 1}}","{""sql_query"": ""SELECT * FROM planet_850 WHERE number_of_moons > $1"", ""query_params"": [1]}"

In total, I generated 57,905 responses at a cost of less than 20 euros.

Finetuning the model

from datasets import load_dataset, DatasetDict

train_dataset = load_dataset("json", data_files="modified_data/train.jsonl")
test_dataset = load_dataset("json", data_files="modified_data/test.jsonl")
valid_dataset = load_dataset("json", data_files="modified_data/valid.jsonl")
dataset = DatasetDict({
    "train": train_dataset["train"],
    "test": test_dataset["train"],
    "valid": valid_dataset["train"]
})
dataset
model = "google/long-t5-tglobal-base"
from transformers import AutoTokenizer, LongT5ForConditionalGeneration
tokenizer = AutoTokenizer.from_pretrained(model, token="myprivatetoken")
model = LongT5ForConditionalGeneration.from_pretrained(model)
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}")
def preprocess_function(examples):
    model_inputs = tokenizer(examples["text"], truncation=True)
    labels = tokenizer(examples["target"], truncation=True)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

tokenized_datasets = dataset.map(preprocess_function, batched=True)

def check_tokenized_data(tokenized_datasets, num_samples=5):
    for index in range(num_samples):
        # Retrieve an example from the tokenized dataset
        tokenized_example = tokenized_datasets['train'][index]

        # Decode the tokenized text and target
        decoded_text = tokenizer.decode(tokenized_example['input_ids'], skip_special_tokens=True)
        decoded_target = tokenizer.decode(tokenized_example['labels'], skip_special_tokens=True)

        # Print original and tokenized data for comparison
        print(f"Original Text: {dataset['train'][index]['text']}")
        print(f"Decoded Tokenized Text: {decoded_text}\n")
        print(f"Original Target: {dataset['train'][index]['target']}")
        print(f"Decoded Tokenized Target: {decoded_target}\n")
        print(f"Input IDs Length: {len(tokenized_example['input_ids'])}")
        print(f"Labels Length: {len(tokenized_example['labels'])}\n")

# Call the function to check the tokenized data
check_tokenized_data(tokenized_datasets)

from peft import get_peft_model, TaskType, LoraConfig
peft_config = LoraConfig(task_type=TaskType.SEQ_2_SEQ_LM, inference_mode=False, r=8, lora_alpha=32, lora_dropout=0.1, target_modules=["q", "v"])
model = get_peft_model(model, peft_config)

# you can also try this instead:
# model.add_adapter(lora_config, adapter_name="adapter_1")

model.base_model.model.encoder.enable_input_require_grads()
model.base_model.model.decoder.enable_input_require_grads()

model.print_trainable_parameters()
import torch

device = "mps" if torch.backends.mps.is_available() else "cpu"
model.to(device)
# model.to("cpu")
from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./long-t5-tglobal-base",
    logging_dir='./logs',  # Directory to store TensorBoard logs
    logging_steps=100,      # Log every 500 steps (this is the default)
    evaluation_strategy="steps",
    eval_steps=500,         # Evaluate every 500 steps (this is the default)
    learning_rate=3e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2, # This could potentially improve training time
    num_train_epochs=2,
    weight_decay=0.01,
    save_total_limit=3,
    load_best_model_at_end=True,
)

from transformers import DataCollatorForSeq2Seq

data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets['train'],
    eval_dataset=tokenized_datasets['valid'],
    data_collator=data_collator,
)

trainer.train()

from huggingface_hub import create_repo

create_repo("long-t5-tglobal-base-sql", token="mytoken")

model.push_to_hub("tjooner/long-t5-tglobal-base-sql", token="mytoken")
from transformers import AutoTokenizer, LongT5ForConditionalGeneration
from peft import PeftModel, PeftConfig

# Replace 'path_to_model_directory' with the path to your model directory
model_path = 'Tjooner/long-t5-tglobal-base-sql'
tokenizer = AutoTokenizer.from_pretrained(model_path, token="myprivatetoken")
model = LongT5ForConditionalGeneration.from_pretrained(model, token="myprivatetoken")
model.load_adapter(model_path)
import torch
import json

device = "mps"

test = {
    "nl_query": "insert a new board of directors member into the database with values this.age, this.name and this.id", "context":
    {"tables": [
        {
            "table": "book_1", "columns": "author: character varying; bookId: character varying; directors_id: uuid"},
        {
            "table": "directors_1", "columns": "age: numeric; name: character varying; id: uuid;"
        }
    ],
        "foreignKeys": [
        {
            "table": "book", "column": "directors_id", "ref_table": "board of directors", "ref_column": "id"
        }, {
            "table": "movie_41", "column": "book_id", "ref_table": "book", "ref_column": "bookId"
        }
    ]},
    "json_obj": {"age": 5, "name": "red", "id": 5}}

# Convert test to string
test_example = json.dumps(test, separators=(',', ':'))

# Preprocess the test example
inputs = tokenizer(test_example, return_tensors="pt")
inputs = {k: v.to(device) for k, v in inputs.items()}

model.to(device)

# Generate prediction
model.eval()
with torch.no_grad():
    outputs = model.generate(**inputs, max_length=1000)

# Decode the generated ids to text
predicted_sql = tokenizer.decode(outputs[0], skip_special_tokens=True)

print("Original Text:", test_example)
print("Predicted SQL:", predicted_sql)

Final result

This is the result of the final model evaluation:

Original Text: {"nl_query":"insert a new board of directors member into the database with values this.age, this.name and this.id","context":{"tables":[{"table":"book_1","columns":"author: character varying; bookId: character varying; directors_id: uuid"},{"table":"directors_1","columns":"age: numeric; name: character varying; id: uuid;"}],"foreignKeys":[{"table":"book","column":"directors_id","ref_table":"board of directors","ref_column":"id"},{"table":"movie_41","column":"book_id","ref_table":"book","ref_column":"bookId"}]},"json_obj":{"age":5,"name":"red","id":5}}
Predicted SQL: "sql_query":"INSERT INTO board_1 (name, age, id) VALUES ($1, $2, $3, $4)",query_params":["item.age", "item.name", "item.id"]

As you can see, the generated SQL query is almost perfect, but it incorrectly assumed the table is called board_1, instead the table is called directors_1.