programing

SQLAlchemy ORM을 사용한 대량 삽입

nasanasas 2020. 8. 20. 18:56
반응형

SQLAlchemy ORM을 사용한 대량 삽입


SQLAlchemy가 각 개별 개체를 삽입하는 대신 대량 삽입을 수행하도록하는 방법이 있습니까? 즉,

하기:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

보다는 :

INSERT INTO `foo` (`bar`) VALUES (1)
INSERT INTO `foo` (`bar`) VALUES (2)
INSERT INTO `foo` (`bar`) VALUES (3)

방금 원시 SQL이 아닌 sqlalchemy를 사용하도록 일부 코드를 변환했으며 이제는 작업하는 것이 훨씬 더 좋아졌지만 (최대 10 배까지) 느려진 것 같지만 이것이 이유인지 궁금합니다.

세션을 더 효율적으로 사용하여 상황을 개선 할 수 있습니다. 현재 나는 몇 가지를 추가 autoCommit=Falsesession.commit()후에 하고 있습니다. 이로 인해 DB가 다른 곳에서 변경되면 데이터가 오래된 것처럼 보이지만 새 쿼리를 수행하더라도 여전히 이전 결과를 다시 얻을 수 있습니까?

당신의 도움을 주셔서 감사합니다!


SQLAlchemy는 다음 버전을 도입했습니다 1.0.0.

대량 작업-SQLAlchemy 문서

이러한 작업을 통해 이제 대량 삽입 또는 업데이트를 수행 할 수 있습니다!

예를 들어 다음을 수행 할 수 있습니다.

s = Session()
objects = [
    User(name="u1"),
    User(name="u2"),
    User(name="u3")
]
s.bulk_save_objects(objects)
s.commit()

여기에서 대량 삽입이 만들어집니다.


내가 아는 한 ORM이 대량 삽입을 발행하는 방법은 없습니다. 근본적인 이유는 SQLAlchemy가 각 개체의 ID (즉, 새 기본 키)를 추적해야하고 대량 삽입이이를 방해하기 때문이라고 생각합니다. 예를 들어 foo테이블에 id열이 있고 Foo클래스에 매핑 되어 있다고 가정합니다 .

x = Foo(bar=1)
print x.id
# None
session.add(x)
session.flush()
# BEGIN
# INSERT INTO foo (bar) VALUES(1)
# COMMIT
print x.id
# 1

SQLAlchemy는 x.id다른 쿼리를 실행하지 않고 값을 선택했기 때문에 INSERT에서 직접 값을 얻었음을 추론 할 수 있습니다 . 동일한 인스턴스 를 통해 생성 된 객체에 대한 후속 액세스가 필요하지 않은 경우 삽입을 위해 ORM 레이어를 건너 뛸 수 있습니다.

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])
# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy는 이러한 새 행을 기존 개체와 일치시킬 수 없으므로 후속 작업에 대해 새로 쿼리해야합니다.

오래된 데이터에 관한 한, 세션에는 데이터베이스가 세션 외부에서 변경되는시기를 알 수있는 기본 제공 방법이 없다는 것을 기억하는 것이 좋습니다. 기존 인스턴스를 통해 외부에서 수정 된 데이터에 액세스하려면 인스턴스가 만료 됨으로 표시되어야합니다 . 이것은 기본적으로 발생 session.commit()하지만, 호출하여 수동으로 수행 할 수 있습니다 session.expire_all()또는 session.expire(instance). 예 (SQL 생략) :

x = Foo(bar=1)
session.add(x)
session.commit()
print x.bar
# 1
foo.update().execute(bar=42)
print x.bar
# 1
session.expire(x)
print x.bar
# 42

session.commit()expires x이므로 첫 번째 print 문은 암시 적으로 새 트랜잭션을 열고 x의 속성을 다시 쿼리 합니다. 첫 번째 print 문을 주석 처리하면 새 쿼리가 업데이트 이후까지 생성되지 않기 때문에 두 번째 문이 올바른 값을 선택한다는 것을 알 수 있습니다.

이것은 트랜잭션 격리의 관점에서 의미가 있습니다. 트랜잭션 사이의 외부 수정 만 선택해야합니다. 이로 인해 문제가 발생하는 경우 즉시 .NET Framework에 도달하는 대신 애플리케이션의 트랜잭션 경계를 명확히하거나 다시 생각하는 것이 좋습니다 session.expire_all().


sqlalchemy 문서에는 대량 삽입에 사용할 수있는 다양한 기술의 성능에 대한 이 있습니다.

ORM은 기본적으로 고성능 대량 삽입을위한 것이 아닙니다. 이것이 SQLAlchemy가 최고급 구성 요소로 ORM 외에도 Core를 제공하는 전체 이유입니다.

빠른 대량 삽입 사용 사례의 경우 ORM이 기반으로 구축하는 SQL 생성 및 실행 시스템이 Core의 일부입니다. 이 시스템을 직접 사용하면 원시 데이터베이스 API를 직접 사용하는 것과 경쟁하는 INSERT를 생성 할 수 있습니다.

또는 SQLAlchemy ORM은 소량의 ORM 기반 자동화로 코어 수준 INSERT 및 UPDATE 구문을 생성하기 위해 작업 단위 프로세스의 하위 섹션에 후크를 제공하는 대량 작업 메서드 모음을 제공합니다.

아래 예는 가장 자동화 된 것에서 가장 적은 것까지 행을 삽입하는 여러 가지 방법에 대한 시간 기반 테스트를 보여줍니다. cPython 2.7에서 런타임은 다음을 관찰했습니다.

classics-MacBook-Pro:sqlalchemy classic$ python test.py
SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs
SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs
SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs
SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs
sqlite3: Total time for 100000 records 0.487842082977 sec

스크립트:

import time
import sqlite3

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String,  create_engine
from sqlalchemy.orm import scoped_session, sessionmaker

Base = declarative_base()
DBSession = scoped_session(sessionmaker())
engine = None


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String(255))


def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):
    global engine
    engine = create_engine(dbname, echo=False)
    DBSession.remove()
    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)


def test_sqlalchemy_orm(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer()
        customer.name = 'NAME ' + str(i)
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_pk_given(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    for i in xrange(n):
        customer = Customer(id=i+1, name="NAME " + str(i))
        DBSession.add(customer)
        if i % 1000 == 0:
            DBSession.flush()
    DBSession.commit()
    print(
        "SQLAlchemy ORM pk given: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_orm_bulk_insert(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    n1 = n
    while n1 > 0:
        n1 = n1 - 10000
        DBSession.bulk_insert_mappings(
            Customer,
            [
                dict(name="NAME " + str(i))
                for i in xrange(min(10000, n1))
            ]
        )
    DBSession.commit()
    print(
        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
        [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )
    print(
        "SQLAlchemy Core: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " secs")


def init_sqlite3(dbname):
    conn = sqlite3.connect(dbname)
    c = conn.cursor()
    c.execute("DROP TABLE IF EXISTS customer")
    c.execute(
        "CREATE TABLE customer (id INTEGER NOT NULL, "
        "name VARCHAR(255), PRIMARY KEY(id))")
    conn.commit()
    return conn


def test_sqlite3(n=100000, dbname='sqlite3.db'):
    conn = init_sqlite3(dbname)
    c = conn.cursor()
    t0 = time.time()
    for i in xrange(n):
        row = ('NAME ' + str(i),)
        c.execute("INSERT INTO customer (name) VALUES (?)", row)
    conn.commit()
    print(
        "sqlite3: Total time for " + str(n) +
        " records " + str(time.time() - t0) + " sec")

if __name__ == '__main__':
    test_sqlalchemy_orm(100000)
    test_sqlalchemy_orm_pk_given(100000)
    test_sqlalchemy_orm_bulk_insert(100000)
    test_sqlalchemy_core(100000)
    test_sqlite3(100000)

I usually do it using add_all.

from app import session
from models import User

objects = [User(name="u1"), User(name="u2"), User(name="u3")]
session.add_all(objects)
session.commit()

Direct support was added to SQLAlchemy as of version 0.8

As per the docs, connection.execute(table.insert().values(data)) should do the trick. (Note that this is not the same as connection.execute(table.insert(), data) which results in many individual row inserts via a call to executemany). On anything but a local connection the difference in performance can be enormous.


SQLAlchemy introduced that in version 1.0.0:

Bulk operations - SQLAlchemy docs

With these operations, you can now do bulk inserts or updates!

For instance (if you want the lowest overhead for simple table INSERTs), you can use Session.bulk_insert_mappings():

loadme = [
        (1, 'a')
    ,   (2, 'b')
    ,   (3, 'c')
    ]

dicts = []
for i in range(len(loadme)):
    dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1]))

s = Session()
s.bulk_insert_mappings(Foo, dicts)
s.commit()

Or, if you want, skip the loadme tuples and write the dictionaries directly into dicts (but I find it easier to leave all the wordiness out of the data and load up a list of dictionaries in a loop).


Piere's answer is correct but one issue is that bulk_save_objects by default does not return the primary keys of the objects, if that is of concern to you. Set return_defaults to True to get this behavior.

The documentation is here.

foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')]
session.bulk_save_objects(foos, return_defaults=True)
for foo in foos:
    assert foo.id is not None
session.commit()

This is a way:

values = [1, 2, 3]
Foo.__table__.insert().execute([{'bar': x} for x in values])

This will insert like this:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3)

Reference: The SQLAlchemy FAQ includes benchmarks for various commit methods.


All Roads Lead to Rome, but some of them crosses mountains, requires ferries but if you want to get there quickly just take the motorway.


In this case the motorway is to use the execute_batch() feature of psycopg2. The documentation says it the best:

The current implementation of executemany() is (using an extremely charitable understatement) not particularly performing. These functions can be used to speed up the repeated execution of a statement against a set of parameters. By reducing the number of server roundtrips the performance can be orders of magnitude better than using executemany().

In my own test execute_batch() is approximately twice as fast as executemany(), and gives the option to configure the page_size for further tweaking (if you want to squeeze the last 2-3% of performance out of the driver).

The same feature can easily be enabled if you are using SQLAlchemy by setting use_batch_mode=True as a parameter when you instantiate the engine with create_engine()


The best answer I found so far was in sqlalchemy documentation:

http://docs.sqlalchemy.org/en/latest/faq/performance.html#i-m-inserting-400-000-rows-with-the-orm-and-it-s-really-slow

There is a complete example of a benchmark of possible solutions.

As shown in the documentation:

bulk_save_objects is not the best solution but it performance are correct.

The second best implementation in terms of readability I think was with the SQLAlchemy Core:

def test_sqlalchemy_core(n=100000):
    init_sqlalchemy()
    t0 = time.time()
    engine.execute(
        Customer.__table__.insert(),
            [{"name": 'NAME ' + str(i)} for i in xrange(n)]
    )

The context of this function is given in the documentation article.

참고URL : https://stackoverflow.com/questions/3659142/bulk-insert-with-sqlalchemy-orm

반응형