Hi,
I have a usecase where I need to write a UDF to decrypt a raw value. So for instance, the query would look like:
my_table:
id(int) | encrypted_value(varchar) | ...
1 | x3vsdf.sdsdf.sdfs3.kjdfkL | ...
2 ...
SELECT id, decrypt_udf(encrypted_value) FROM my_table
The tricky part is decrypt_udf, requires contacting an external service (an RPC sending encrypted_value and receving decrypted value). Therefore, I'd need to initialize a connection when the plugin is loaded and persist the connection, so that we don't need to pay the cost of connection initialization each time a decrypt_udf is called by a worker.
I see the simple scalar UDFs are all implemented as static functions, is it correct to assume our statically initialized connection will be reused whenever decrypt_udf is called? basically I am trying to figure out where to keep my connection and do the initialization so that it's not repeated each time.
Last question, obviously it's much more efficient to batch and send multiple decrypt requests in one RPC as opposed to do it multiple times. Imagine, my query:
SELECT id, decrypt_udf(encrypted_value) FROM my_table LIMIT 100
This means decrypt_udf will be called 100 times. Do presto workers call decrypt_udf sequentially or is there any concurrency going on? if they are called concurrently I can keep them in memory, batch them and send them all together (block decrypt_udf, till the result is read). If not, what's the proper way to batch these calls? is UDF even the right approach?
Thank you so much for your help.
In general we strongly discourage UDFs to contact external services. Scalar functions are generally assumed to be quick and efficient. Having RPC calls is very far from that assumption. And the implications of slow UDFs are not well tested in Presto. One way to work around this is to use the thrift connector. So instead of using decrypt_udf(encrypted_value), you can write something like
SELECT id, decrypted_value
FROM my_table t
JOIN decrypt_service d
on t.encrypted_value = d.encrypted_value
and decrypt_service is a table backed by thrift connector.
We are exploring ideas on supporting external service udfs but that's probably late 2019 or 2020 effort.
Thanks a lot for valuable your feedback @rongrong. I think I am gonna start experimenting with thrift connector; it does sound like what I am looking for.
Is there an issue or draft for supporting external service udfs? I would like to track/contribute.
We are still working on extending the core engine to support dynamic functions through function namespace. You can follow that work in #9613. @cemcayiroglu is working on potentially simplify the thrift connector for the use case you mentioned here (so instead of implementing all APIs for the thrift connector, maybe you only need to implement 2). And we were literally just thinking that maybe once we have the dynamic function framework built up, we can introduce a thrift backed function namespace. So at this moment it's just future possibilities, nothing concrete yet.
Hi Rongrong, I looked at the Thrift connector interface and like you mentioned, it needs a bit of simplification. A lot of them around splits, don't really make sense in this usecase; I've not deep dived to the code; maybe I could return something fixed there each time or try to model my service somehow in terms of splits, but what I really need is a way to get input into getRows so that to send that to my service andlistTables/ getTableMetadata. I understand the support for dynamic functions is not gonna be anytime soon but just trying to find a sensible way forward, so I can get my service available in Presto.
Another idea that came to my mind was to just write a brand new connector, something similar to the redis connector, since essentially my service is a key/value lookup service too. So should be very similar.
Finally, my last crazy idea was to just write a basic connector and expose two stored procedures to expose the table and decrypt_rows, so something like the kill_query procedure that system connector exposes. Just not sure, if those can be used to manipulate data, something like:
SELECT id, decrypted_value
FROM my_table t
JOIN decrypt_service.decrypt_table() d
on t.encrypted_value = d.encrypted_value(t.encrypted_value)
thoughts?
I can't see how the stored procedures idea could work. Implementing this as thrift connector is a lot of work, which is why we are thinking of providing a simpler interface on top of that to support this use case. Since you would only run JOIN with this thrift connector, theoretically you only need to implement the index join API (getIndexSplits) plus the metadata APIs. Though implementing this might still not be trivial, which is why @cemcayiroglu is looking into how to simplify it for this use case.
My hesitation to use Thrift connectors is that I will have to run Thrift servers (correct me if I am wrong) which means I will need to either run separate servers for my service or change my service altogether. Further, it'd be a lot of unnecessary serialization/deserialization in between going on which will slowdown the whole thing.
Wouldn't then my easiest bet be to just write a simple custom connector similar to the Redis connector?
You have a good point of extra serialization/deserialization but I think it is not avoidable. But the key-value connector will have a dependency to a particular key value store (maybe redis). We are lloking for a more general purpose interface. Although the new thrift interface will have a similar key-value look up.
馃憢馃徎 Hi Cem, you are absolutely correct, generally speaking, it means depending on a particular key value store. However, I wasn't proposing anything around the general Thrift interface. I was just trying to understand the best way to address my usecase.
Using thrift, not only adds extra serialization/deserialization, it adds network latency (Thrift servers RTT). and another point of failure to my whole workflow, Presto Worker-->Thrift Server-->My external Server (unless my external service implements thirft interface, which requires changes to our external service); We'll be needing to run this at scale and I am not sure if this flow would bring us any value as opposed to hide the complexity of talking to the external Server inside an adhoc custom interface.
Now, if I could wish, I'd have liked that thrift connector allowed me to implement the server locally (per worker) in addition to supporting dedicated thrift servers.
Hi! Got it! Implementing a custom key-value store may give you the best performance for your use case. But your use is very valuable to this feature as well. I am looking for implementing a general purpose solution that would also address your use case. You have a good point about network and CPU overhead. We can maybe come up with a solution that respects data locality. So the thirft servers can be colocated with worker nodes. Maybe in the next releases we can start thinking about how to bypass thrift interface. What do you think?
Thanks for your insight Cem. Right, I think it'd be a great feature if thrift could respect data locality and colocate with workers. I am gonna go with the custom key-value like connector for our purpose for now but would like to follow your work around that. Is there a project brief or ticket to track this work?
If you are worried about the reliability of the introduced thrift server, running that on worker would be even more dangerous because its reliability would also affect the worker's reliability. One of the primary motivations for moving code contact external services out of worker is to decouple failure domain. While running RPC on worker would save serialization / deserialization cost, it does not necessarily avoid any issues you'd otherwise run into regarding to scalability and reliability.
@bshafiee Not yet. I am going to create one when I have a more concrete idea about the design.
@rongrong good point. But I think we should have the option for both deployment models. Generally speaking, we need to think about data locality. Thrift server can be colocated to workers or in the same RAC.
so we'd want to optimize the usual case which is calling the external service and things go smoothly. Now I see your point about affecting the worker but two things come to mind:
1) that should be exception case not the usual one.
2) the failures there, really shouldn't affect the worker, it should be the same thing as if the worker called the Thrift server, and it either didn't reply or returned failure (maybe give X time deadline for the thrift implementer to fullfill the request and timeout after that? and wrap the whole thing inside a try-catch?)
If you are concerned about adding extra load on the worker nodes, then it's understandable but the meat of the work is done in the actual external server and the code that runs on the local worker, is basically a client calling the remote server (which is the same thing that the thrift client would have done).
So TLDR; I'd say as long as the failure is properly boxed and handled by the worker it doesn't really add any new reliability issue; if anything it cuts it down.
The benefits of this approach would be:
1) there is no need to have an extra network hop and serialization/deserialization
2) no need to maintain an extra set of specialized thrift servers
@bshafiee I have the exact same issue and would like to know how you have handled it currently? I am trying to use AWS KMS api to make decrypt calls. Even caching the keys isnt working as expected. I was also suggested to try the instance functions instead of static.