Yummy-Delta
This package contains Yummy Delta Lake api written in Rust. The api use delta-rs implementation.
Instalation
pip3 install yummy[delta]
Quick start
yummy delta server -h 0.0.0.0 -p 8080 -f /test-yummy/yummy-delta-tests/config.yaml
In the configuration file we can define multiple stores using supported backends.
config.yaml
stores:
- name: local
path: "/tmp/delta-test-1/"
- name: az
path: "az://delta-test-1/"
Delta backends
Local file system
/tmp/delta-test-1
AWS S3 / S3
s3://<bucket>/<path>
s3a://<bucket>/<path>
Environment variables: AWS_REGION=”us-east-1”, AWS_ACCESS_KEY_ID=”deltalake”, AWS_SECRET_ACCESS_KEY=”weloverust”, AWS_ENDPOINT_URL=endpoint_url,
Azure Blob Storage / Azure Datalake Storage Gen2
az://<container>/<path>
adl://<container>/<path>
abfs://<container>/<path>
Environment variables: AZURE_STORAGE_ACCOUNT_NAME=”devstoreaccount1”, AZURE_STORAGE_ACCOUNT_KEY=”**”, AZURE_STORAGE_USE_EMULATOR=”false”, AZURE_STORAGE_USE_HTTP=”true”,
Google cloud storage
gs://<bucket>/<path>
Rest API
Stores list
Request:
curl -X GET "http://localhost:8080/api/1.0/delta" \
-H "Content-Type: application/json"
Response:
{'stores': [{'path': '/tmp/delta-test-1/', 'store': 'local'},
{'path': 'az://delta-test-1/', 'store': 'az'}]}
Tables list
Request:
curl -X GET "http://localhost:8080/api/1.0/delta/{store_name}" \
-H "Content-Type: application/json"
Response:
{'path': 'az://delta-test-1/',
'store': 'az',
'tables': ['test_delta_1', 'test_delta_2', 'test_delta_3', 'test_delta_4']}
Table details
Request:
curl -X GET "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}/details" \
-H "Content-Type: application/json"
Response:
{'path': 'az://delta-test-1/test_delta_4',
'schema': {'fields': [{'metadata': {},
'name': 'col1',
'nullable': false,
'type': 'integer'},
{'metadata': {},
'name': 'col2',
'nullable': false,
'type': 'string'}],
'type': 'struct'},
'store': 'az',
'table': 'test_delta_4',
'version': 100}
Create delta table
Request:
curl -X POST "http://localhost:8080/api/1.0/delta/{store_name}" \
-H "Content-Type: application/json" \
-d '{
"table": "test_delta_4",
"schema": [
{
"name": "col1",
"type": "integer",
"nullable": false
},
{
"name": "col2",
"type": "string",
"nullable": false
}
],
"partition_columns": ["col2"],
"comment": "Hello from delta"
}'
Response:
{ "table": "test_delta_4" }
Append data
Request:
curl -X POST "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}" \
-H "Content-Type: application/json" \
-d '
{"record_batch_dict": {"col1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
"col2": ["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]}}'
Response:
{'table': 'test_delta_4', 'version': 101}
OR:
Request:
curl -X POST "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}" \
-H "Content-Type: application/json" \
-d '
{"record_batch_list": [
{"col1": 1, "col2": "A"},
{"col1": 2, "col2": "B"},
{"col1": 3, "col2": "A"},
{"col1": 4, "col2": "B"},
{"col1": 5, "col2": "A"},
{"col1": 6, "col2": "A"},
{"col1": 7, "col2": "A"},
{"col1": 8, "col2": "B"},
{"col1": 9, "col2": "B"},
{"col1": 10, "col2": "A"},
{"col1": 11, "col2": "A"}
]
}
Response:
{'table': 'test_delta_4', 'version': 101}
Overwrite data
Request:
curl -X PUT "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}" \
-H "Content-Type: application/json" \
-d '
{"record_batch_dict": {"col1": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
"col2": ["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]}}'
Response:
{'table': 'test_delta_4', 'version': 101}
OR:
Request:
curl -X PUT "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}" \
-H "Content-Type: application/json" \
-d '
{"record_batch_list": [
{"col1": 1, "col2": "A"},
{"col1": 2, "col2": "B"},
{"col1": 3, "col2": "A"},
{"col1": 4, "col2": "B"},
{"col1": 5, "col2": "A"},
{"col1": 6, "col2": "A"},
{"col1": 7, "col2": "A"},
{"col1": 8, "col2": "B"},
{"col1": 9, "col2": "B"},
{"col1": 10, "col2": "A"},
{"col1": 11, "col2": "A"}
]
}
Response:
{'table': 'test_delta_4', 'version': 101}
Optimize
Request:
curl -X POST "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}/optimize" \
-H "Content-Type: application/json" \
-d '{
"target_size": 2000000,
"filters": [
{
"column": "col2",
"operator": "=",
"value": "B"
}
]
}'
Response:
{'metrics': {'filesAdded': {'avg': 524.5,
'max': 541,
'min': 508,
'totalFiles': 2,
'totalSize': 1049},
'filesRemoved': {'avg': 506.75,
'max': 538,
'min': 499,
'totalFiles': 36,
'totalSize': 18243},
'numBatches': 1,
'numFilesAdded': 2,
'numFilesRemoved': 36,
'partitionsOptimized': 2,
'preserveInsertionOrder': true,
'totalConsideredFiles': 36,
'totalFilesSkipped': 0}}
Vacuum
Request:
curl -X POST "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}/vacuum" \
-H "Content-Type: application/json" \
-d '{
"retention_period_seconds": 0,
"enforce_retention_duration": false,
"dry_run": false
}'
Response:
{'dry_run': False,
'files_deleted': ['col2=A/part-00000-09fef82b-9208-41be-8769-30c442b36346-c000.snappy.parquet',
'col2=A/part-00000-16779a2f-d2f4-450c-b815-3b8fca850402-c000.snappy.parquet',
'col2=A/part-00000-26e01b49-a432-4d06-816d-9a2048c0194e-c000.snappy.parquet',
'col2=A/part-00000-3ff1f905-87ae-4b73-a3ae-3cd1056171a0-c000.snappy.parquet',
'col2=A/part-00000-50dbcfea-22d6-4ef3-bacf-f1371b9186ad-c000.snappy.parquet',
'col2=A/part-00000-511d3ceb-0959-4024-9bbe-d049bb4fb4c6-c000.snappy.parquet',
'col2=A/part-00000-56e5484f-1722-45eb-84bb-1475b67e9f63-c000.snappy.parquet',
'col2=B/part-00000-b15a1cd7-9a15-4ba6-8ba8-e3ca63737f81-c000.snappy.parquet',
'col2=B/part-00000-d4fadc68-46b5-4f1f-a378-e9f3dba842f0-c000.snappy.parquet',
'col2=B/part-00000-d6e823d1-433e-4798-a641-1e2783c8c1eb-c000.snappy.parquet',
'col2=B/part-00000-ded9a5b7-4754-47b7-9e2c-132263cea52e-c000.snappy.parquet',
'col2=B/part-00000-fb89f02c-8ea3-47d9-8794-182d8602687c-c000.snappy.parquet']}
Query
Request:
curl -X GET "http://localhost:8080/api/1.0/delta/{store_name}/{table_name}?query={sql}&braces=true" \
-H "Content-Type: application/json"
Response :
[
{"column_name": ["data"], ...} <= record batch
...
]
Example:
# SQL
SELECT CAST(col2 as STRING), SUM(col1), COUNT(col1), MAX(col1), AVG(col1) FROM test_delta_4 GROUP BY col2
# Example response
[{'AVG(test_delta_4.col1)': [6.142857142857143, 5.75],
'COUNT(test_delta_4.col1)': [231, 132],
'MAX(test_delta_4.col1)': [11, 9],
'SUM(test_delta_4.col1)': [1419, 759],
'test_delta_4.col2': ['A', 'B']}]
Response is returned as a stream thus you can process it in batches. Python example:
import pyarrow as pa
import pyarrow.json
import aiohttp
import json
query="SELECT CAST(col2 as STRING), col1 FROM test_delta_4 order by col1 "
url=f"http://localhost:8080/api/1.0/delta/az/test_delta_4?query={query}&braces=false"
async def stream(url):
batches = []
async with aiohttp.request('get', url) as r:
async for data in r.content.iter_chunks():
d=json.loads(data[0])
batch =pa.RecordBatch.from_pydict(d) # read batch to pyarrow
batches.append(batch)
return batches
batches = await stream(url)
pa.Table.from_batches(batches).to_pandas() # convert batches array to pandas