I'm trying to migrate database tables from MySQL to SQL Server:
import pandas as pd
from sqlalchemy import create_engine
my_engine = create_engine("mysql+pymysql://root:pass@localhost/gen")
ms_engine = create_engine('mssql+pyodbc://localhost/gen?driver=SQL Server')
for table_name in ['topics', 'fiction', 'compact']:
for table in pd.read_sql_query('SELECT * FROM %s' % table_name,
my_engine,
chunksize=100000):
table.to_sql(name=table_name, con=ms_engine, if_exists='append')
I thought that using chunksize would release the memory, but it's just growing up.
I tried also garbage collector, but it has no effect.
Maybe my expectations were wrong?
_I'm using Python 3.5.1 with pandas 0.17.1 and all latest packages, although I tried also Python 2.7 with pandas 0.16 and same results_
Maybe my expectations were wrong?
Possibly. chunksize
is in any case a somewhat misleading keyword:
chunksize
does not necessarily fetches the data from the database into python in chunks. By default it will fetch all data into memory at once, and only returns the data in chunks (so the conversion to a dataframe happens in chunks). Generally, this is a limitation of the database drivers.execution_options
, but until then you can set this yourself for the engine you provide to read_sql
(but this works only using psycopg2
).Side note, there may be better ways for database migration. Eg teh author of SQLAlchemy has also a database migration tool: https://pypi.python.org/pypi/alembic
Thanks for your prompt reply.
So I've been mislead with chunksize... I remember I was using it in the past, but perhaps understood it wrong, and used it with tables that fits in memory...
PG is also an option for me instead MSSQL, but I'll try alembic first.
Cheers
PG example parameters for this example would be very nice ;)
In principle, I think it should be something like this:
engine = create_engine("...", execution_options=dict(stream_results=True))
for table in pd.read_sql_query('SELECT * FROM %s' % table_name, engine, chunksize=100000):
...
But, I never tested this myself. Would be interesting to hear experiences with it.
Thanks Joris, you seem like a really nice person
unfortunately your snippet seems like not enough:
import pandas as pd
from sqlalchemy import create_engine
my_engine = create_engine("mysql+pymysql://root:pass@localhost/gen")
pg_engine = create_engine('postgresql://postgres:postgres@localhost:5432/gen',
execution_options=dict(stream_results=True))
for table_name in tables.keys():
for table in pd.read_sql('SELECT * FROM %s' % table_name,
my_engine,
chunksize=10000):
table.to_sql(name=table_name, con=pg_engine, if_exists='append')
I mean this is just for reference. It would be nice if we could migrate data in chunks to PG just by using pandas.
Anyway I'm reading alembic and I will put here simple script if it's simple as in pandas :)
Ah, but note that the execution_option
of stream_results
is for _fetching_ the data. You are fetching the data from MySQL, which does not support this, and so the above will still fetch all data at once into memory, so it is normal you do not see any difference.
Several days later, for reference...
Alembic was too complicated for my concentration. I tried FME and Navicat apps, and while later didn't manage to make migration through "Data transfer" for all tables, former migrated successfully, but although MySQL tables were encoded in UTF-8 it didn't use nvarchar
data type for SQL Server, so I got records with garbage characters. On top of it no index was preserved.
So I used Python (^_^):
#!/usr/bin/env python3
import pandas as pd
from sqlalchemy import create_engine
my_engine = create_engine("mysql+pymysql://root:pass@localhost/gen?charset=utf8")
ms_engine = create_engine('mssql+pyodbc://localhost/gen?driver=SQL Server')
chunksize = 10000
for table_name in ['topics', 'fiction', 'compact']:
row_count = int(pd.read_sql('SELECT COUNT(*) FROM {table_name}'.format(
table_name=table_name), my_engine).values)
for i in range(int(row_count / chunksize) + 1):
query = 'SELECT * FROM {table_name} LIMIT {offset}, {chunksize}'.format(
table_name=table_name, offset=i * chunksize, chunksize=chunksize)
pd.read_sql_query(query, con=my_engine).to_sql(
name=table_name, con=ms_engine, if_exists='append', index=False)
@tfurmston I notice you removed your comment. But, it was a very useful comment, so if you want, feel free to add it again.
Thank you @klonuo, I'm using your solution for myself.
Question. What if the sqlalchemy engine had another boolean option whereby if selected with chunksize this simple loop with limits is done in the background? Curious what other would think. Or is it better to be as explicit, as @klonuo's solution?
I see that server side cursors are supported in sqlalchemy now (New in version 1.1.4):
http://docs.sqlalchemy.org/en/latest/dialects/mysql.html#server-side-cursors
I have verified that
engine = create_engine('mysql+pymysql://user:password@domain/database', server_side_cursors=True)
result = engine.execute(sql_query)
result.fetchone()
returns a row inmediately (i.e. the client doesn't read the complete table in memory). This should be useful to allow read_sql to read in chunks and avoid memory problems. Passing the parameter chunk to fetchmany:
result.fetchmany(chunk)
should do the trick?
I know I'm pretty late to the party, but I use the OFFSET module in my SQL quarries wrapped inside of a for loop to gather the data in chunks.
@alfonsomhc I have now tried that, and even with results.fetchmany(10)
my memory goes above 7 GB (I have also tried with execution_options={"stream_results":True}
Most helpful comment
I see that server side cursors are supported in sqlalchemy now (New in version 1.1.4):
http://docs.sqlalchemy.org/en/latest/dialects/mysql.html#server-side-cursors
I have verified that
returns a row inmediately (i.e. the client doesn't read the complete table in memory). This should be useful to allow read_sql to read in chunks and avoid memory problems. Passing the parameter chunk to fetchmany:
result.fetchmany(chunk)
should do the trick?