We are trying to figure out how to up the possible connection pool size for the BigQuery Client. I'm not sure if this is a bug, but doing the below for https and http had no effect.
Thanks in advance!
BigQuery
Running on a Docker Python:3.6-slim image
Version: google-cloud-bigquery==1.12.1
Open a Client Connection
Start multiple load jobs in parallel threads
Check output.
bq = bigquery.Client(project=project)
bq._http.adapters['https://']._pool_connections = 100
load_job = bq.load_table_from_uri(
source_uris=f'gs://{bucket.name}/dart/{extraction_name}/data/{seg_id}_*',
destination=bigquery.TableReference.from_string(f'{project}.{dataset_id}.{table_id}'),
job_config=job_config)
{{connectionpool.py:275}} WARNING - Connection pool is full, discarding connection: www.googleapis.com
The requests.adapters.HTTPAdapter class takes three pool-related parameters:
pool_connections: number of urllib3 connection poolspool_maxsize: maximum number of connections in a given poolpool_block: whether to block when requesting another connection after reaching pool_maxsize.In your case, the warning is occurring because you have the default pool_maxsize (10), pool_block is False (the default), and so the extra connections are being discarded, rather than returned to the pool. You need to configure pool_maxsize, rather than (or in addition to) pool_connections.
Okay. Thanks.
Now I understand the difference, however I've tried all of these with no change in behavior.
bq._http.adapters['https://']._pool_connections = 200
bq._http.adapters['https://']._pool_maxsize = 200
bq._http.adapters['http://']._pool_maxsize = 200
bq._http.adapters['http://']._pool_connections = 200
bq._http_internal.adapters['https://']._pool_maxsize=100
bq._http_internal.adapters['https://']._pool_connections = 100
bq._http_internal.adapters['http://']._pool_maxsize = 100
bq._http_internal.adapters['http://']._pool_connections = 100
For example if I use 16 threads, I encounter the same warnings. Is there a different syntax I should be using here other than directly modifying these?
I would also like to know if there is a more convenient/canonical way to adjust these parameters
The http connection pool is the right solution but couldnt get that working and it seems experimental from the docs.
I am using this work around - it spaws threads and each thread has a new bq object. You would have to consider limiting the threads to what your underlying hardware etc can support but this can do more than 10 queries which is the number limited by the default connection pool manager.
Thought I will share, maybe this helps you.
import threading
def multithread_bq_calls(query_strs, dest_tables, job_configs, max_threads):
# Setting up threading (value here can specify the max threads)
pool_sema = threading.BoundedSemaphore(value=max_threads)
jobs_done = dict()
def make_bq_call(query_str, dest_table, job_config, pool_sema):
logging.debug(f'Enter: {query_str}' )
with pool_sema:
bqclient = bigquery.Client() # this creates new instance hence has a pool of 10
if job_config is None:
job_config = bigquery.QueryJobConfig()
table_ref = bqclient.dataset(request_json['dataset_name'], project=request_json['project_name']).table(dest_table)
job_config.destination = table_ref
job_config.write_disposition = 'WRITE_APPEND' # if exist append the table
job = bqclient.query(
query_str,
job_config=job_config,
)
job.result()
jobs_done[job.job_id] = True
logger.debug(f'Done: {job.job_id} - total: {sum(jobs_done.values())} of {len(jobs_done)} finished' )
# starting all the threads - note the pool_sema will make sure there are only 30 threads active any time in this case
for k,query_str in query_strs.items():
job_config=job_configs.get(k, None)
dest_table=dest_tables.get(k, None)
t = threading.Thread(
target=make_bq_call,
args=(query_str, dest_table, job_config, pool_sema),
)
t.start()
# waiting for all the threads to be done before moving forward
while sum(jobs_done.values()) < len(query_strs) :
logging.info(
f"Waiting for Jobs to finish - {sum(jobs_done.values())} of {len(query_strs)} finished"
)
sleep(1)
logging.info(f"Done - {sum(jobs_done.values())} of {len(query_strs)} finished")
return sum(jobs_done.values())
# Creating multiple query strings and storing it in a list
del_query_strs = {}
temp_json = request_json.copy()
delete_query_str = '''
DELETE FROM `{project_name}.{dataset_name}.{table_name}`
WHERE timestamp = "{date}"
'''
for t in tables_to_create:
temp_json['table_name'] = t
query_str = delete_query_str.format_map(temp_json)
del_query_strs[t]= query_str
dest_tables = dict()
job_configs = dict()
max_threads = 30
multithread_bq_calls(del_query_strs, dest_tables, job_configs, max_threads)
confirming this bug is still present.
it appears that these values are not changeable after initialization.
I'm trying to hack my own _http object in there during init by doing something like:
from requests.adapters import HTTPAdapter
from google.cloud.bigquery import Client
client = Client(
project_id,
_http=HTTPAdapter(
pool_connections=100,
pool_maxsize=100,
max_retries=10,
pool_block=False)
)
It lets me initialize the client like this, but as soon as I try to use it:
AttributeError: 'HTTPAdapter' object has no attribute 'request'
Current development on the Python google-cloud-bigquery client library has moved to the https://github.com/googleapis/python-bigquery/ repository. Please follow up there.
Most helpful comment
Okay. Thanks.
Now I understand the difference, however I've tried all of these with no change in behavior.
For example if I use 16 threads, I encounter the same warnings. Is there a different syntax I should be using here other than directly modifying these?