diff --git a/Pipfile.lock b/Pipfile.lock index f2e868c9..7eec1f27 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "e3eb385c3342f53cdb1793dc2055ec8f42669a15854b358632a5ceaa9dc1135b" + "sha256": "8d6521c10626550c727da281b89d0b6df5d31eb7dc5fe6e7aadf3de58bae44a7" }, "pipfile-spec": 6, "requires": { @@ -18,9 +18,9 @@ "default": { "asks": { "hashes": [ - "sha256:d76a6314ecd7d2f920d2e94b8d7bcbb7a0941aa4c915874869c503c757088df2" + "sha256:c3fc1115dfeb414ef0863da6f60f02aea7487f92f76b645738774bf93e8577de" ], - "version": "==1.3.11" + "version": "==2.0.0" }, "async-generator": { "hashes": [ @@ -31,10 +31,10 @@ }, "attrs": { "hashes": [ - "sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9", - "sha256:a17a9573a6f475c99b551c0e0a812707ddda1ec9653bed04c13841404ed6f450" + "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265", + "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b" ], - "version": "==17.4.0" + "version": "==18.1.0" }, "click": { "hashes": [ @@ -52,46 +52,47 @@ }, "contextvars": { "hashes": [ - "sha256:e9f9c5763d5a2afa6e420218e53954bd3829fc2d8acd806b98139d28c362cdf9" + "sha256:7d73f8b1426cf0200fbe16900fcd73c9be29c54546b48a3980727e670b1acb10" ], "markers": "python_version < '3.7'", - "version": "==2.1" + "version": "==2.2" }, "cython": { "hashes": [ - "sha256:03db8c1b8120039f72493b95494a595be13b01b6860cfc93e2a651a001847b3b", - "sha256:0d2ccb812d73e67557fd16e7aa7bc5bac18933c1dfe306133cd0680ccab89f33", - "sha256:24f8ea864de733f5a447896cbeec2cac212247e33272539670b9f466f43f23db", - "sha256:30a8fd029eb932a7b5a74e158316d1d069ccb67a8607aa7b6c4ed19fab7fbd4a", - "sha256:37e680901e6a4b97ab67717f9b43fc58542cd10a77431efd2d8801d21d5a37d4", - "sha256:4984e097bc9da37862d97c1f66dacf2c80fadaea488d96ba0b5ea9d84dbc7521", - "sha256:4cfda677227af41e4502e088ee9875e71922238a207d0c40785a0fb09c703c21", - "sha256:4ec60a4086a175a81b9258f810440a6dd2671aa4b419d8248546d85a7de6a93f", - "sha256:51c7d48ea4cba532d11a6d128ebbc15373013f816e5d1c3a3946650b582a30b8", - "sha256:634e2f10fc8d026c633cffacb45cd8f4582149fa68e1428124e762dbc566e68a", - "sha256:67e0359709c8addc3ecb19e1dec6d84d67647e3906da618b953001f6d4480275", - "sha256:6a93d4ba0461edc7a359241f4ebbaa8f9bc9490b3540a8dd0460bef8c2c706db", - "sha256:6ba89d56c3ee45716378cda4f0490c3abe1edf79dce8b997f31608b14748a52b", - "sha256:6ca5436d470584ba6fd399a802c9d0bcf76cf1edb0123725a4de2f0048f9fa07", - "sha256:7656895cdd59d56dd4ed326d1ee9ede727020d4a5d8778a05af2d8e25af4b13d", - "sha256:85f7432776870d65639fed00f951a3c05ef1e534bc72a73cd1200d79b9a7d7d0", - "sha256:96dd674e72281d3feed74fd5adcf0514ba02884f123cdf4fb78567e7be6b1694", - "sha256:97bf06a89bcf9e8d7633cde89274d42b3b661dc974b58fca066fad762e46b4d8", - "sha256:9a465e7296a4629139be5d2015577f2ae5e08196eb7dc4c407beea130f362dc3", - "sha256:9a60355edca1cc9006be086e2633e190542aad2bf9e46948792a48b3ae28ed97", - "sha256:9eab3696f2cb88167db109d737c787fb9dd34ca414bd1e0c424e307956e02c94", - "sha256:c3ae7d40ebceb0d944dfeeceaf1fbf17e528f5327d97b008a8623ddddd1ecee3", - "sha256:c623d19fcc60ea27882f20cf484218926ddf6f978b958dae1070600a1974f809", - "sha256:c719a6e86d7c737afcc9729994f76b284d1c512099ee803eff11c2a9e6e33a42", - "sha256:cf17af0433218a1e33dc6f3069dd9e7cd0c80fe505972c3acd548e25f67973fd", - "sha256:daf96e0d232605e979995795f62ffd24c5c6ecea4526e4cbb86d80f01da954b2", - "sha256:db40de7d03842d3c4625028a74189ade52b27f8efaeb0d2ca06474f57e0813b2", - "sha256:deea1ef59445568dd7738fa3913aea7747e4927ff4ae3c10737844b8a5dd3e22", - "sha256:e05d28b5ce1ee5939d83e50344980659688ecaed65c5e10214d817ecf5d1fe6a", - "sha256:f5f6694ce668eb7a9b59550bfe4265258809c9b0665c206b26d697df2eef2a8b" + "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5", + "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff", + "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d", + "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355", + "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526", + "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c", + "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f", + "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729", + "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88", + "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224", + "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325", + "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe", + "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15", + "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4", + "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9", + "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62", + "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437", + "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142", + "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17", + "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214", + "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea", + "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2", + "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87", + "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f", + "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff", + "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304", + "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3", + "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64", + "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed", + "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e", + "sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9" ], "index": "pypi", - "version": "==0.28.2" + "version": "==0.28.3" }, "e1839a8": { "editable": true, @@ -112,14 +113,30 @@ }, "idna": { "hashes": [ - "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", - "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" + "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", + "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" ], - "version": "==2.6" + "version": "==2.7" + }, + "immutables": { + "hashes": [ + "sha256:1614177d08408176a7b12313d6efefad41c3ba17e56cbeb404bb74f83cfdeaac", + "sha256:18cd84d5ff10ffd42db163d4aad88128d86c2939335b068852f8f399594521ee", + "sha256:4d29f0e6a880a20f94c9efd4672212c3276fa303621a4bdf2a5e4ba7f27cba84", + "sha256:5180319a7aebc9319e63e7ea663830d46566b84bed91cbcdb50e247d65fbda07", + "sha256:57d17742fccec4c0466ab658053fab1a327df42241e7b1cab28e5b85e7066ac1", + "sha256:75e2e4e9441938690821adc1ab00829a174f696aef1c52ae6aba767219d1e84f", + "sha256:812fd1124fca2f56a3a7f376f34ac44719f131d82b6001323aef84faa8be7f00", + "sha256:8a3d2fc0d9db57188f28d3e6cefe45e9905292965895a7a391be6ededa626811", + "sha256:95da22e2d16b47dbd3464ae447b94f8d74866bce045be216dd1691f2f1838a94", + "sha256:9c8e0267a4d35032ccc670e5ab52aba2693ded330c4c1f2a4870b4a7c86b65af", + "sha256:da1c24d6ab6b38604444b4b767b3bf6074dfa3f7d15d8192a3a4935f61f00bd6" + ], + "version": "==0.5" }, "kivy": { "git": "git://github.com/matham/kivy.git", - "ref": "async-loop" + "ref": "4f62d0d8754ae2b590b44573200ea07b983f0f7b" }, "msgpack": { "hashes": [ @@ -144,59 +161,57 @@ }, "multio": { "hashes": [ - "sha256:53fd38f5d90a5f1a5d2db507b73c474ef851f5465fab27ffabe401591808258a", - "sha256:a6219395a1f84605c9041f0a7e8a529b989557c8a95920ddcd29fbed1d721758", - "sha256:f61bc6cf0ee8ea0ba32d5b9ae5ae1cadaebc39b6635a9b3d54142ded78164fe3" + "sha256:dcaee4d5d77cde8caf7902c8621aaa192febb384c7b1291fd47cfa41ac0eaebc" ], - "version": "==0.2.1" + "version": "==0.2.3" }, "numpy": { "hashes": [ - "sha256:0739146eaf4985962f07c62f7133aca89f3a600faac891ce6c7f3a1e2afe5272", - "sha256:07e21f14490324cc1160db101e9b6c1233c33985af4cb1d301dd02650fea1d7f", - "sha256:0f6a5ed0cd7ab1da11f5c07a8ecada73fc55a70ef7bb6311a4109891341d7277", - "sha256:0fd65cbbfdbf76bbf80c445d923b3accefea0fe2c2082049e0ce947c81fe1d3f", - "sha256:20cac3123d791e4bf8482a580d98d6b5969ba348b9d5364df791ba3a666b660d", - "sha256:528ce59ded2008f9e8543e0146acb3a98a9890da00adf8904b1e18c82099418b", - "sha256:56e392b7c738bd70e6f46cf48c8194d3d1dd4c5a59fae4b30c58bb6ef86e5233", - "sha256:675e0f23967ce71067d12b6944add505d5f0a251f819cfb44bdf8ee7072c090d", - "sha256:6be6b0ca705321c178c9858e5ad5611af664bbdfae1df1541f938a840a103888", - "sha256:719d914f564f35cce4dc103808f8297c807c9f0297ac183ed81ae8b5650e698e", - "sha256:768e777cc1ffdbf97c507f65975c8686ebafe0f3dc8925d02ac117acc4669ce9", - "sha256:7f76d406c6b998d6410198dcb82688dcdaec7d846aa87e263ccf52efdcfeba30", - "sha256:8c18ee4dddd5c6a811930c0a7c7947bf16387da3b394725f6063f1366311187d", - "sha256:99051e03b445117b26028623f1a487112ddf61a09a27e2d25e6bc07d37d94f25", - "sha256:a1413d06abfa942ca0553bf3bccaff5fdb36d55b84f2248e36228db871147dab", - "sha256:a7157c9ac6bddd2908c35ef099e4b643bc0e0ebb4d653deb54891d29258dd329", - "sha256:a958bf9d4834c72dee4f91a0476e7837b8a2966dc6fcfc42c421405f98d0da51", - "sha256:bb370120de6d26004358611441e07acda26840e41dfedc259d7f8cc613f96495", - "sha256:d0928076d9bd8a98de44e79b1abe50c1456e7abbb40af7ef58092086f1a6c729", - "sha256:d858423f5ed444d494b15c4cc90a206e1b8c31354c781ac7584da0d21c09c1c3", - "sha256:e6120d63b50e2248219f53302af7ec6fa2a42ed1f37e9cda2c76dbaca65036a7", - "sha256:f2b1378b63bdb581d5d7af2ec0373c8d40d651941d283a2afd7fc71184b3f570", - "sha256:facc6f925c3099ac01a1f03758100772560a0b020fb9d70f210404be08006bcb" + "sha256:07379fe0b450f6fd6e5934a9bc015025bb4ce1c8fbed3ca8bef29328b1bc9570", + "sha256:085afac75bbc97a096744fcfc97a4b321c5a87220286811e85089ae04885acdd", + "sha256:2d6481c6bdab1c75affc0fc71eb1bd4b3ecef620d06f2f60c3f00521d54be04f", + "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160", + "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8", + "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6", + "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5", + "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da", + "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388", + "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2", + "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3", + "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50", + "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9", + "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b", + "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b", + "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e", + "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e", + "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996", + "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac", + "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51", + "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608", + "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9", + "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418" ], - "version": "==1.14.2" + "version": "==1.14.5" }, "pandas": { "hashes": [ - "sha256:02541a4fdd31315f213a5c8e18708abad719ee03eda05f603c4fe973e9b9d770", - "sha256:052a66f58783a59ea38fdfee25de083b107baa81fdbe38fabd169d0f9efce2bf", - "sha256:06efae5c00b9f4c6e6d3fe1eb52e590ff0ea8e5cb58032c724e04d31c540de53", - "sha256:12f2a19d0b0adf31170d98d0e8bcbc59add0965a9b0c65d39e0665400491c0c5", - "sha256:244ae0b9e998cfa88452a49b20e29bf582cc7c0e69093876d505aec4f8e1c7fe", - "sha256:2907f3fe91ca2119ac3c38de6891bbbc83333bfe0d98309768fee28de563ee7a", - "sha256:44a94091dd71f05922eec661638ec1a35f26d573c119aa2fad964f10a2880e6c", - "sha256:587a9816cc663c958fcff7907c553b73fe196604f990bc98e1b71ebf07e45b44", - "sha256:66403162c8b45325a995493bdd78ad4d8be085e527d721dbfa773d56fbba9c88", - "sha256:68ac484e857dcbbd07ea7c6f516cc67f7f143f5313d9bc661470e7f473528882", - "sha256:68b121d13177f5128a4c118bb4f73ba40df28292c038389961aa55ea5a996427", - "sha256:97c8223d42d43d86ca359a57b4702ca0529c6553e83d736e93a5699951f0f8db", - "sha256:af0dbac881f6f87acd325415adea0ce8cccf28f5d4ad7a54b6a1e176e2f7bf70", - "sha256:c2cd884794924687edbaad40d18ac984054d247bb877890932c4d41e3c3aba31", - "sha256:c372db80a5bcb143c9cb254d50f902772c3b093a4f965275197ec2d2184b1e61" + "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6", + "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd", + "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca", + "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f", + "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2", + "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17", + "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690", + "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1", + "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270", + "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60", + "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c", + "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7", + "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f", + "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8", + "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0" ], - "version": "==0.22.0" + "version": "==0.23.1" }, "pdbpp": { "hashes": [ @@ -214,10 +229,10 @@ }, "python-dateutil": { "hashes": [ - "sha256:3220490fb9741e2342e1cf29a503394fdac874bc39568288717ee67047ff29df", - "sha256:9d8074be4c993fbe4947878ce593052f71dac82932a677d49194d8ce9778002e" + "sha256:1adb80e7a782c12e52ef9a8182bebeb73f1d7e24e374397af06fb4956c8dc5c0", + "sha256:e27001de32f627c22380a688bcc43ce83504a7bc5da472209b4c70f02829f0b8" ], - "version": "==2.7.2" + "version": "==2.7.3" }, "pytz": { "hashes": [ @@ -235,15 +250,14 @@ }, "sortedcontainers": { "hashes": [ - "sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761", - "sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282" + "sha256:607294c6e291a270948420f7ffa1fb3ed47384a4c08db6d1e9c92d08a6981982", + "sha256:ef38b128302ee8f65d81e31c9d8fbf10d81df4d6d06c9c0b66f01d33747525bb" ], - "version": "==1.5.10" + "version": "==2.0.4" }, "trio": { "hashes": [ "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", - "sha256:8266525067dc6553592383bef1832556e6962daba765fa42a81880c5d9f4b785", "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" ], "version": "==0.4.0" @@ -258,9 +272,9 @@ "develop": { "asks": { "hashes": [ - "sha256:d76a6314ecd7d2f920d2e94b8d7bcbb7a0941aa4c915874869c503c757088df2" + "sha256:c3fc1115dfeb414ef0863da6f60f02aea7487f92f76b645738774bf93e8577de" ], - "version": "==1.3.11" + "version": "==2.0.0" }, "async-generator": { "hashes": [ @@ -269,12 +283,19 @@ ], "version": "==1.9" }, + "atomicwrites": { + "hashes": [ + "sha256:240831ea22da9ab882b551b31d4225591e5e447a68c5e188db5b89ca1d487585", + "sha256:a24da68318b08ac9c9c45029f4a10371ab5b20e4226738e150e6e7c571630ae6" + ], + "version": "==1.1.5" + }, "attrs": { "hashes": [ - "sha256:1c7960ccfd6a005cd9f7ba884e6316b5e430a3f1a6c37c5f87d8b43f83b54ec9", - "sha256:a17a9573a6f475c99b551c0e0a812707ddda1ec9653bed04c13841404ed6f450" + "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265", + "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b" ], - "version": "==17.4.0" + "version": "==18.1.0" }, "click": { "hashes": [ @@ -292,46 +313,47 @@ }, "contextvars": { "hashes": [ - "sha256:e9f9c5763d5a2afa6e420218e53954bd3829fc2d8acd806b98139d28c362cdf9" + "sha256:7d73f8b1426cf0200fbe16900fcd73c9be29c54546b48a3980727e670b1acb10" ], "markers": "python_version < '3.7'", - "version": "==2.1" + "version": "==2.2" }, "cython": { "hashes": [ - "sha256:03db8c1b8120039f72493b95494a595be13b01b6860cfc93e2a651a001847b3b", - "sha256:0d2ccb812d73e67557fd16e7aa7bc5bac18933c1dfe306133cd0680ccab89f33", - "sha256:24f8ea864de733f5a447896cbeec2cac212247e33272539670b9f466f43f23db", - "sha256:30a8fd029eb932a7b5a74e158316d1d069ccb67a8607aa7b6c4ed19fab7fbd4a", - "sha256:37e680901e6a4b97ab67717f9b43fc58542cd10a77431efd2d8801d21d5a37d4", - "sha256:4984e097bc9da37862d97c1f66dacf2c80fadaea488d96ba0b5ea9d84dbc7521", - "sha256:4cfda677227af41e4502e088ee9875e71922238a207d0c40785a0fb09c703c21", - "sha256:4ec60a4086a175a81b9258f810440a6dd2671aa4b419d8248546d85a7de6a93f", - "sha256:51c7d48ea4cba532d11a6d128ebbc15373013f816e5d1c3a3946650b582a30b8", - "sha256:634e2f10fc8d026c633cffacb45cd8f4582149fa68e1428124e762dbc566e68a", - "sha256:67e0359709c8addc3ecb19e1dec6d84d67647e3906da618b953001f6d4480275", - "sha256:6a93d4ba0461edc7a359241f4ebbaa8f9bc9490b3540a8dd0460bef8c2c706db", - "sha256:6ba89d56c3ee45716378cda4f0490c3abe1edf79dce8b997f31608b14748a52b", - "sha256:6ca5436d470584ba6fd399a802c9d0bcf76cf1edb0123725a4de2f0048f9fa07", - "sha256:7656895cdd59d56dd4ed326d1ee9ede727020d4a5d8778a05af2d8e25af4b13d", - "sha256:85f7432776870d65639fed00f951a3c05ef1e534bc72a73cd1200d79b9a7d7d0", - "sha256:96dd674e72281d3feed74fd5adcf0514ba02884f123cdf4fb78567e7be6b1694", - "sha256:97bf06a89bcf9e8d7633cde89274d42b3b661dc974b58fca066fad762e46b4d8", - "sha256:9a465e7296a4629139be5d2015577f2ae5e08196eb7dc4c407beea130f362dc3", - "sha256:9a60355edca1cc9006be086e2633e190542aad2bf9e46948792a48b3ae28ed97", - "sha256:9eab3696f2cb88167db109d737c787fb9dd34ca414bd1e0c424e307956e02c94", - "sha256:c3ae7d40ebceb0d944dfeeceaf1fbf17e528f5327d97b008a8623ddddd1ecee3", - "sha256:c623d19fcc60ea27882f20cf484218926ddf6f978b958dae1070600a1974f809", - "sha256:c719a6e86d7c737afcc9729994f76b284d1c512099ee803eff11c2a9e6e33a42", - "sha256:cf17af0433218a1e33dc6f3069dd9e7cd0c80fe505972c3acd548e25f67973fd", - "sha256:daf96e0d232605e979995795f62ffd24c5c6ecea4526e4cbb86d80f01da954b2", - "sha256:db40de7d03842d3c4625028a74189ade52b27f8efaeb0d2ca06474f57e0813b2", - "sha256:deea1ef59445568dd7738fa3913aea7747e4927ff4ae3c10737844b8a5dd3e22", - "sha256:e05d28b5ce1ee5939d83e50344980659688ecaed65c5e10214d817ecf5d1fe6a", - "sha256:f5f6694ce668eb7a9b59550bfe4265258809c9b0665c206b26d697df2eef2a8b" + "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5", + "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff", + "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d", + "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355", + "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526", + "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c", + "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f", + "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729", + "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88", + "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224", + "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325", + "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe", + "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15", + "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4", + "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9", + "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62", + "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437", + "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142", + "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17", + "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214", + "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea", + "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2", + "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87", + "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f", + "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff", + "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304", + "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3", + "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64", + "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed", + "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e", + "sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9" ], "index": "pypi", - "version": "==0.28.2" + "version": "==0.28.3" }, "e1839a8": { "editable": true, @@ -352,18 +374,34 @@ }, "idna": { "hashes": [ - "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", - "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" + "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", + "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" ], - "version": "==2.6" + "version": "==2.7" + }, + "immutables": { + "hashes": [ + "sha256:1614177d08408176a7b12313d6efefad41c3ba17e56cbeb404bb74f83cfdeaac", + "sha256:18cd84d5ff10ffd42db163d4aad88128d86c2939335b068852f8f399594521ee", + "sha256:4d29f0e6a880a20f94c9efd4672212c3276fa303621a4bdf2a5e4ba7f27cba84", + "sha256:5180319a7aebc9319e63e7ea663830d46566b84bed91cbcdb50e247d65fbda07", + "sha256:57d17742fccec4c0466ab658053fab1a327df42241e7b1cab28e5b85e7066ac1", + "sha256:75e2e4e9441938690821adc1ab00829a174f696aef1c52ae6aba767219d1e84f", + "sha256:812fd1124fca2f56a3a7f376f34ac44719f131d82b6001323aef84faa8be7f00", + "sha256:8a3d2fc0d9db57188f28d3e6cefe45e9905292965895a7a391be6ededa626811", + "sha256:95da22e2d16b47dbd3464ae447b94f8d74866bce045be216dd1691f2f1838a94", + "sha256:9c8e0267a4d35032ccc670e5ab52aba2693ded330c4c1f2a4870b4a7c86b65af", + "sha256:da1c24d6ab6b38604444b4b767b3bf6074dfa3f7d15d8192a3a4935f61f00bd6" + ], + "version": "==0.5" }, "more-itertools": { "hashes": [ - "sha256:0dd8f72eeab0d2c3bd489025bb2f6a1b8342f9b198f6fc37b52d15cfa4531fea", - "sha256:11a625025954c20145b37ff6309cd54e39ca94f72f6bb9576d1195db6fa2442e", - "sha256:c9ce7eccdcb901a2c75d326ea134e0886abfbea5f93e91cc95de9507c0816c44" + "sha256:2b6b9893337bfd9166bee6a62c2b0c9fe7735dcf85948b387ec8cba30e85d8e8", + "sha256:6703844a52d3588f951883005efcf555e49566a48afd4db4e965d69b883980d3", + "sha256:a18d870ef2ffca2b8463c0070ad17b5978056f403fb64e3f15fe62a52db21cc0" ], - "version": "==4.1.0" + "version": "==4.2.0" }, "msgpack": { "hashes": [ @@ -388,59 +426,57 @@ }, "multio": { "hashes": [ - "sha256:53fd38f5d90a5f1a5d2db507b73c474ef851f5465fab27ffabe401591808258a", - "sha256:a6219395a1f84605c9041f0a7e8a529b989557c8a95920ddcd29fbed1d721758", - "sha256:f61bc6cf0ee8ea0ba32d5b9ae5ae1cadaebc39b6635a9b3d54142ded78164fe3" + "sha256:dcaee4d5d77cde8caf7902c8621aaa192febb384c7b1291fd47cfa41ac0eaebc" ], - "version": "==0.2.1" + "version": "==0.2.3" }, "numpy": { "hashes": [ - "sha256:0739146eaf4985962f07c62f7133aca89f3a600faac891ce6c7f3a1e2afe5272", - "sha256:07e21f14490324cc1160db101e9b6c1233c33985af4cb1d301dd02650fea1d7f", - "sha256:0f6a5ed0cd7ab1da11f5c07a8ecada73fc55a70ef7bb6311a4109891341d7277", - "sha256:0fd65cbbfdbf76bbf80c445d923b3accefea0fe2c2082049e0ce947c81fe1d3f", - "sha256:20cac3123d791e4bf8482a580d98d6b5969ba348b9d5364df791ba3a666b660d", - "sha256:528ce59ded2008f9e8543e0146acb3a98a9890da00adf8904b1e18c82099418b", - "sha256:56e392b7c738bd70e6f46cf48c8194d3d1dd4c5a59fae4b30c58bb6ef86e5233", - "sha256:675e0f23967ce71067d12b6944add505d5f0a251f819cfb44bdf8ee7072c090d", - "sha256:6be6b0ca705321c178c9858e5ad5611af664bbdfae1df1541f938a840a103888", - "sha256:719d914f564f35cce4dc103808f8297c807c9f0297ac183ed81ae8b5650e698e", - "sha256:768e777cc1ffdbf97c507f65975c8686ebafe0f3dc8925d02ac117acc4669ce9", - "sha256:7f76d406c6b998d6410198dcb82688dcdaec7d846aa87e263ccf52efdcfeba30", - "sha256:8c18ee4dddd5c6a811930c0a7c7947bf16387da3b394725f6063f1366311187d", - "sha256:99051e03b445117b26028623f1a487112ddf61a09a27e2d25e6bc07d37d94f25", - "sha256:a1413d06abfa942ca0553bf3bccaff5fdb36d55b84f2248e36228db871147dab", - "sha256:a7157c9ac6bddd2908c35ef099e4b643bc0e0ebb4d653deb54891d29258dd329", - "sha256:a958bf9d4834c72dee4f91a0476e7837b8a2966dc6fcfc42c421405f98d0da51", - "sha256:bb370120de6d26004358611441e07acda26840e41dfedc259d7f8cc613f96495", - "sha256:d0928076d9bd8a98de44e79b1abe50c1456e7abbb40af7ef58092086f1a6c729", - "sha256:d858423f5ed444d494b15c4cc90a206e1b8c31354c781ac7584da0d21c09c1c3", - "sha256:e6120d63b50e2248219f53302af7ec6fa2a42ed1f37e9cda2c76dbaca65036a7", - "sha256:f2b1378b63bdb581d5d7af2ec0373c8d40d651941d283a2afd7fc71184b3f570", - "sha256:facc6f925c3099ac01a1f03758100772560a0b020fb9d70f210404be08006bcb" + "sha256:07379fe0b450f6fd6e5934a9bc015025bb4ce1c8fbed3ca8bef29328b1bc9570", + "sha256:085afac75bbc97a096744fcfc97a4b321c5a87220286811e85089ae04885acdd", + "sha256:2d6481c6bdab1c75affc0fc71eb1bd4b3ecef620d06f2f60c3f00521d54be04f", + "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160", + "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8", + "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6", + "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5", + "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da", + "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388", + "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2", + "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3", + "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50", + "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9", + "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b", + "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b", + "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e", + "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e", + "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996", + "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac", + "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51", + "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608", + "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9", + "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418" ], - "version": "==1.14.2" + "version": "==1.14.5" }, "pandas": { "hashes": [ - "sha256:02541a4fdd31315f213a5c8e18708abad719ee03eda05f603c4fe973e9b9d770", - "sha256:052a66f58783a59ea38fdfee25de083b107baa81fdbe38fabd169d0f9efce2bf", - "sha256:06efae5c00b9f4c6e6d3fe1eb52e590ff0ea8e5cb58032c724e04d31c540de53", - "sha256:12f2a19d0b0adf31170d98d0e8bcbc59add0965a9b0c65d39e0665400491c0c5", - "sha256:244ae0b9e998cfa88452a49b20e29bf582cc7c0e69093876d505aec4f8e1c7fe", - "sha256:2907f3fe91ca2119ac3c38de6891bbbc83333bfe0d98309768fee28de563ee7a", - "sha256:44a94091dd71f05922eec661638ec1a35f26d573c119aa2fad964f10a2880e6c", - "sha256:587a9816cc663c958fcff7907c553b73fe196604f990bc98e1b71ebf07e45b44", - "sha256:66403162c8b45325a995493bdd78ad4d8be085e527d721dbfa773d56fbba9c88", - "sha256:68ac484e857dcbbd07ea7c6f516cc67f7f143f5313d9bc661470e7f473528882", - "sha256:68b121d13177f5128a4c118bb4f73ba40df28292c038389961aa55ea5a996427", - "sha256:97c8223d42d43d86ca359a57b4702ca0529c6553e83d736e93a5699951f0f8db", - "sha256:af0dbac881f6f87acd325415adea0ce8cccf28f5d4ad7a54b6a1e176e2f7bf70", - "sha256:c2cd884794924687edbaad40d18ac984054d247bb877890932c4d41e3c3aba31", - "sha256:c372db80a5bcb143c9cb254d50f902772c3b093a4f965275197ec2d2184b1e61" + "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6", + "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd", + "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca", + "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f", + "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2", + "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17", + "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690", + "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1", + "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270", + "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60", + "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c", + "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7", + "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f", + "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8", + "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0" ], - "version": "==0.22.0" + "version": "==0.23.1" }, "pdbpp": { "hashes": [ @@ -459,10 +495,10 @@ }, "py": { "hashes": [ - "sha256:29c9fab495d7528e80ba1e343b958684f4ace687327e6f789a94bf3d1915f881", - "sha256:983f77f3331356039fdd792e9220b7b8ee1aa6bd2b25f567a963ff1de5a64f6a" + "sha256:3fd59af7435864e1a243790d322d763925431213b6b8529c6ca71081ace3bbf7", + "sha256:e31fb2767eb657cbde86c454f02e99cb846d3cd9d61b318525140214fdc0e98e" ], - "version": "==1.5.3" + "version": "==1.5.4" }, "pygments": { "hashes": [ @@ -473,18 +509,18 @@ }, "pytest": { "hashes": [ - "sha256:6266f87ab64692112e5477eba395cfedda53b1933ccd29478e671e73b420c19c", - "sha256:fae491d1874f199537fd5872b5e1f0e74a009b979df9d53d1553fd03da1703e1" + "sha256:8ea01fc4fcc8e1b1e305252b4bc80a1528019ab99fd3b88666c9dc38d754406c", + "sha256:90898786b3d0b880b47645bae7b51aa9bbf1e9d1e4510c2cfd15dd65c70ea0cd" ], "index": "pypi", - "version": "==3.5.0" + "version": "==3.6.2" }, "python-dateutil": { "hashes": [ - "sha256:3220490fb9741e2342e1cf29a503394fdac874bc39568288717ee67047ff29df", - "sha256:9d8074be4c993fbe4947878ce593052f71dac82932a677d49194d8ce9778002e" + "sha256:1adb80e7a782c12e52ef9a8182bebeb73f1d7e24e374397af06fb4956c8dc5c0", + "sha256:e27001de32f627c22380a688bcc43ce83504a7bc5da472209b4c70f02829f0b8" ], - "version": "==2.7.2" + "version": "==2.7.3" }, "pytz": { "hashes": [ @@ -502,15 +538,14 @@ }, "sortedcontainers": { "hashes": [ - "sha256:566cf5f8dbada3aed99737a19d98f03d15d76bf2a6c27e4fb0f4a718a99be761", - "sha256:fa96e9920a37bde76bfdcaca919a125c1d2e581af1137e25de54ee0da7835282" + "sha256:607294c6e291a270948420f7ffa1fb3ed47384a4c08db6d1e9c92d08a6981982", + "sha256:ef38b128302ee8f65d81e31c9d8fbf10d81df4d6d06c9c0b66f01d33747525bb" ], - "version": "==1.5.10" + "version": "==2.0.4" }, "trio": { "hashes": [ "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", - "sha256:8266525067dc6553592383bef1832556e6962daba765fa42a81880c5d9f4b785", "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" ], "version": "==0.4.0" diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 5ec1071c..506e1869 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -8,11 +8,14 @@ import socket from types import ModuleType from typing import Coroutine, Callable -import msgpack import trio +from .. import tractor from ..log import get_logger +from ..ipc import Channel from . import get_brokermod + + log = get_logger('broker.core') @@ -69,138 +72,13 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: await trio.sleep(sleep) -class StreamQueue: - """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. - """ - def __init__(self, stream): - self.stream = stream - self.peer = stream.socket.getpeername() - self._agen = self._iter_packets() - - async def _iter_packets(self): - """Yield packets from the underlying stream. - """ - unpacker = msgpack.Unpacker(raw=False) - while True: - try: - data = await self.stream.receive_some(2**10) - log.trace(f"Data is {data}") - except trio.BrokenStreamError: - log.error(f"Stream connection {self.peer} broke") - return - - if data == b'': - log.debug("Stream connection was closed") - return - - unpacker.feed(data) - for packet in unpacker: - yield packet - - async def put(self, data): - return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) - - async def get(self): - return await self._agen.asend(None) - - async def __aiter__(self): - return self._agen - - -class Client: - """The most basic client. - - Use this to talk to any micro-service daemon or other client(s) over a - TCP socket managed by ``trio``. - """ - def __init__( - self, sockaddr: tuple, - on_reconnect: Coroutine, - auto_reconnect: bool = True, - ): - self.sockaddr = sockaddr - self._recon_seq = on_reconnect - self._autorecon = auto_reconnect - self.squeue = None - - async def connect(self, sockaddr: tuple = None, **kwargs): - sockaddr = sockaddr or self.sockaddr - stream = await trio.open_tcp_stream(*sockaddr, **kwargs) - self.squeue = StreamQueue(stream) - return stream - - async def send(self, item): - await self.squeue.put(item) - - async def recv(self): - try: - return await self.squeue.get() - except trio.BrokenStreamError as err: - if self._autorecon: - await self._reconnect() - return await self.recv() - - async def aclose(self, *args): - await self.squeue.stream.aclose() - - async def __aenter__(self): - await self.connect(self.sockaddr) - return self - - async def __aexit__(self, *args): - await self.aclose(*args) - - async def _reconnect(self): - """Handle connection failures by polling until a reconnect can be - established. - """ - down = False - while True: - try: - with trio.move_on_after(3) as cancel_scope: - await self.connect() - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn( - "Reconnect timed out after 3 seconds, retrying...") - continue - else: - log.warn("Stream connection re-established!") - # run any reconnection sequence - await self._recon_seq(self) - break - except (OSError, ConnectionRefusedError): - if not down: - down = True - log.warn( - f"Connection to {self.sockaddr} went down, waiting" - " for re-establishment") - await trio.sleep(1) - - async def aiter_recv(self): - """Async iterate items from underlying stream. - """ - while True: - try: - async for item in self.squeue: - yield item - except trio.BrokenStreamError as err: - if not self._autorecon: - raise - if self._autorecon: # attempt reconnect - await self._reconnect() - continue - else: - return - - async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, - tickers2qs: {str: StreamQueue}, + tickers2chans: {str: Channel}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue + cid: str = None, ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. @@ -219,11 +97,11 @@ async def stream_quotes( while True: # use an event here to trigger exit? prequote_start = time.time() - if not any(tickers2qs.values()): + if not any(tickers2chans.values()): log.warn(f"No subs left for broker {brokermod.name}, exiting task") break - tickers = list(tickers2qs.keys()) + tickers = list(tickers2chans.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await get_quotes(tickers) @@ -234,7 +112,7 @@ async def stream_quotes( quotes = await wait_for_network(partial(get_quotes, tickers)) postquote_start = time.time() - q_payloads = {} + chan_payloads = {} for symbol, quote in quotes.items(): if diff_cached: # if cache is enabled then only deliver "new" changes @@ -244,25 +122,31 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote + for chan, cid in tickers2chans.get(symbol, set()): + chan_payloads.setdefault( + chan, + {'yield': {}, 'cid': cid} + )['yield'][symbol] = quote else: - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote + for chan, cid in tickers2chans[symbol]: + chan_payloads.setdefault( + chan, + {'yield': {}, 'cid': cid} + )['yield'][symbol] = quote # deliver to each subscriber - if q_payloads: - for queue, payload in q_payloads.items(): + if chan_payloads: + for chan, payload in chan_payloads.items(): try: - await queue.put(payload) + await chan.send(payload) except ( # That's right, anything you can think of... trio.ClosedStreamError, ConnectionResetError, ConnectionRefusedError, ): - log.warn(f"{queue.peer} went down?") - for qset in tickers2qs.values(): - qset.discard(queue) + log.warn(f"{chan} went down?") + for chanset in tickers2chans.values(): + chanset.discard((chan, cid)) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -277,13 +161,104 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + log.info(f"Terminating stream quoter task for {brokermod.name}") -async def start_quoter( - broker2tickersubs: dict, - clients: dict, - dtasks: set, # daemon task registry - nursery: "Nusery", - stream: trio.SocketStream, + +async def get_cached_client(broker, tickers): + """Get the current actor's cached broker client if available or create a + new one. + """ + # check if a cached client is in the local actor's statespace + clients = tractor.current_actor().statespace.setdefault('clients', {}) + try: + return clients[broker] + except KeyError: + log.info(f"Creating new client for broker {broker}") + brokermod = get_brokermod(broker) + # TODO: move to AsyncExitStack in 3.7 + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + clients[broker] = ( + brokermod, client, client_cntxmng, get_quotes) + + return brokermod, client, client_cntxmng, get_quotes + + +async def symbol_data(broker, tickers): + """Retrieve baseline symbol info from broker. + """ + _, client, _, get_quotes = await get_cached_client(broker, tickers) + return await client.symbol_data(tickers) + + +async def smoke_quote(get_quotes, tickers, broker): + """Do an initial "smoke" request for symbols in ``tickers`` filtering + out any symbols not supported by the broker queried in the call to + ``get_quotes()``. + """ + # TODO: trim out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for symbols {tickers}") + quotes = await get_quotes(tickers) + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(quotes) + for symbol in invalid_tickers: + tickers.remove(symbol) + log.warn( + f"Symbol `{symbol}` not found by broker `{broker}`" + ) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{broker}`") + # XXX: not this mutates the input list (for now) + tickers.remove(symbol) + continue + payload[symbol] = quote + + return payload + + # end of section to be trimmed out with #37 + ########################################### + + +def modify_quote_stream(broker, tickers, chan=None, cid=None): + """Absolute symbol subscription list for each quote stream. + """ + log.info(f"{chan} changed symbol subscription to {tickers}") + ss = tractor.current_actor().statespace + broker2tickersubs = ss['broker2tickersubs'] + tickers2chans = broker2tickersubs.get(broker) + # update map from each symbol to requesting client's chan + for ticker in tickers: + tickers2chans.setdefault(ticker, set()).add((chan, cid)) + + for ticker in filter( + lambda ticker: ticker not in tickers, tickers2chans.copy() + ): + chanset = tickers2chans.get(ticker) + if chanset: + chanset.discard((chan, cid)) + + if not chanset: + # pop empty sets which will trigger bg quoter task termination + tickers2chans.pop(ticker) + + +async def start_quote_stream( + broker: str, + tickers: [str], + chan: 'Channel' = None, + cid: str = None, ) -> None: """Handle per-broker quote stream subscriptions. @@ -291,116 +266,76 @@ async def start_quoter( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.info(f"Accepted new connection from {queue.peer}") - async with queue.stream: - async for broker, tickers in queue: - log.info( - f"{queue.peer} subscribed to {broker} for tickers {tickers}") + # pull global vars from local actor + ss = tractor.current_actor().statespace + broker2tickersubs = ss['broker2tickersubs'] + clients = ss['clients'] + dtasks = ss['dtasks'] + tickers = list(tickers) + log.info( + f"{chan.uid} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: - brokermod = get_brokermod(broker) + brokermod, client, _, get_quotes = await get_cached_client(broker, tickers) + if broker not in broker2tickersubs: + tickers2chans = broker2tickersubs.setdefault(broker, {}) + else: + log.info(f"Subscribing with existing `{broker}` daemon") + tickers2chans = broker2tickersubs[broker] - # TODO: move to AsyncExitStack in 3.7 - client_cntxmng = brokermod.get_client() - client = await client_cntxmng.__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - clients[broker] = ( - brokermod, client, client_cntxmng, get_quotes) - tickers2qs = broker2tickersubs.setdefault(broker, {}) - else: - log.info(f"Subscribing with existing `{broker}` daemon") - brokermod, client, _, get_quotes = clients[broker] - tickers2qs = broker2tickersubs[broker] + # do a smoke quote (not this mutates the input list and filters out bad + # symbols for now) + payload = await smoke_quote(get_quotes, tickers, broker) + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) - # beginning of section to be trimmed out with #37 - ################################################# - # get a single quote filtering out any bad tickers - # NOTE: this code is always run for every new client - # subscription even when a broker quoter task is already running - # since the new client needs to know what symbols are accepted - log.warn(f"Retrieving smoke quote for {queue.peer}") - quotes = await get_quotes(tickers) - # report any tickers that aren't returned in the first quote - invalid_tickers = set(tickers) - set(quotes) - for symbol in invalid_tickers: - tickers.remove(symbol) - log.warn( - f"Symbol `{symbol}` not found by broker `{brokermod.name}`" + # update map from each symbol to requesting client's chan + modify_quote_stream(broker, tickers, chan=chan, cid=cid) + + try: + if broker not in dtasks: # no quoter task yet + # task should begin on the next checkpoint/iteration + # with trio.open_cancel_scope(shield=True): + log.info(f"Spawning quoter task for {brokermod.name}") + # await actor._root_nursery.start(partial( + async with trio.open_nursery() as nursery: + nursery.start_soon(partial( + stream_quotes, brokermod, get_quotes, tickers2chans, + cid=cid) ) - - # pop any tickers that return "empty" quotes - payload = {} - for symbol, quote in quotes.items(): - if quote is None: - log.warn( - f"Symbol `{symbol}` not found by broker" - f" `{brokermod.name}`") - tickers.remove(symbol) - continue - payload[symbol] = quote - - # end of section to be trimmed out with #37 - ########################################### - - # first respond with symbol data for all tickers (allows - # clients to receive broker specific setup info) - sd = await client.symbol_data(tickers) - assert sd, "No symbol data could be found?" - await queue.put(sd) - - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) - - # push initial quotes response for client initialization - await queue.put(payload) - - if broker not in dtasks: # no quoter task yet - # task should begin on the next checkpoint/iteration - log.info(f"Spawning quoter task for {brokermod.name}") - nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2qs) dtasks.add(broker) - log.debug("Waiting on subscription request") - else: - log.info(f"client @ {queue.peer} disconnected") - # drop any lingering subscriptions - for ticker, qset in tickers2qs.items(): - qset.discard(queue) + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates (usually because another call + # was made to `modify_quoter` to unsubscribe from streaming + # symbols) + log.info(f"Terminated quoter task for {brokermod.name}") - # if there are no more subscriptions with this broker - # drop from broker subs dict - if not any(tickers2qs.values()): - log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker, None) - dtasks.discard(broker) - - # TODO: move to AsyncExitStack in 3.7 - for _, _, cntxmng, _ in clients.values(): - # FIXME: yes I know it's totally wrong... - await cntxmng.__aexit__(None, None, None) + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know there's no error handling.. + await cntxmng.__aexit__(None, None, None) + finally: + # if there are truly no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2chans.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) -async def _daemon_main(host) -> None: - """Entry point for the broker daemon which waits for connections - before spawning micro-services. +async def _test_price_stream(broker, symbols, *, chan=None, cid=None): + """Test function for initial tractor draft. """ - # global space for broker-daemon subscriptions - broker2tickersubs = {} - clients = {} - dtasks = set() + brokermod = get_brokermod(broker) + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, symbols) + log.info(f"Spawning quoter task for {brokermod.name}") + assert chan + tickers2chans = {}.fromkeys(symbols, {(chan, cid), }) async with trio.open_nursery() as nursery: - listeners = await nursery.start( + nursery.start_soon( partial( - trio.serve_tcp, - partial( - start_quoter, broker2tickersubs, clients, - dtasks, nursery - ), - 1616, host=host, - ) + stream_quotes, brokermod, get_quotes, tickers2chans, cid=cid) ) - log.debug(f"Spawned {listeners}") diff --git a/piker/cli.py b/piker/cli.py index 7fa8edf5..a703970a 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -2,7 +2,6 @@ Console interface to broker client/daemons. """ from functools import partial -from multiprocessing import Process import json import os @@ -12,8 +11,8 @@ import trio from . import watchlists as wl from .brokers import core, get_brokermod -from .brokers.core import _daemon_main, Client from .log import get_console_log, colorize_json, get_logger +from . import tractor log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -22,18 +21,24 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') -def run(main, loglevel='info'): - get_console_log(loglevel) - return trio.run(main) - - @click.command() @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--host', '-h', default='127.0.0.1', help='Host address to bind') def pikerd(loglevel, host): """Spawn the piker daemon. """ - run(partial(_daemon_main, host), loglevel) + get_console_log(loglevel) + tractor.run( + None, # no main task - this is a daemon + statespace={ + 'broker2tickersubs': {}, + 'clients': {}, + 'dtasks': set(), + }, + outlive_main=True, # run daemon forever + rpc_module_paths=['piker.brokers.core'], + name='brokerd', + ) @click.group() @@ -52,7 +57,7 @@ def cli(): def api(meth, kwargs, loglevel, broker, keys): """client for testing broker API methods with pretty printing of output. """ - log = get_console_log(loglevel) + get_console_log(loglevel) brokermod = get_brokermod(broker) _kwargs = {} @@ -63,8 +68,9 @@ def api(meth, kwargs, loglevel, broker, keys): key, _, value = kwarg.partition('=') _kwargs[key] = value - data = run( - partial(core.api, brokermod, meth, **_kwargs), loglevel=loglevel) + data = trio.run( + partial(core.api, brokermod, meth, **_kwargs) + ) if keys: # filter to requested keys @@ -88,10 +94,12 @@ def api(meth, kwargs, loglevel, broker, keys): help='Ouput in `pandas.DataFrame` format') @click.argument('tickers', nargs=-1, required=True) def quote(loglevel, broker, tickers, df_output): - """client for testing broker API methods with pretty printing of output. + """Retreive symbol quotes on the console in either json or dataframe + format. """ brokermod = get_brokermod(broker) - quotes = run(partial(core.quote, brokermod, tickers), loglevel=loglevel) + get_console_log(loglevel) + quotes = trio.run(partial(core.quote, brokermod, tickers)) if not quotes: log.error(f"No quotes could be found for {tickers}?") return @@ -132,48 +140,28 @@ def watch(loglevel, broker, rate, name, dhost): async def launch_client(sleep=0.5, tries=10): - async def subscribe(client): - # initial subs request for symbols - await client.send((brokermod.name, tickers)) - # symbol data is returned in first response which we'll - # ignore on reconnect - await client.recv() + async with tractor.open_nursery() as nursery: + async with tractor.find_actor('brokerd') as portal: + if not portal: + log.warn("No broker daemon could be found") + log.warning("Spawning local brokerd..") + portal = await nursery.start_actor( + 'brokerd', + main=None, # no main task + statespace={ + 'broker2tickersubs': {}, + 'clients': {}, + 'dtasks': set(), + }, + outlive_main=True, # run daemon forever + rpc_module_paths=['piker.brokers.core'], + loglevel=loglevel, + ) - client = Client((dhost, 1616), on_reconnect=subscribe) - for _ in range(tries): # try for 5 seconds - try: - await client.connect() - break - except OSError as oserr: - await trio.sleep(sleep) - else: - # will raise indicating child proc should be spawned - await client.connect() + # run kivy app + await _async_main(name, portal, tickers, brokermod, rate) - async with trio.open_nursery() as nursery: - nursery.start_soon( - _async_main, name, client, tickers, - brokermod, rate - ) - - # signal exit of stream handler task - await client.aclose() - - try: - trio.run(partial(launch_client, tries=1)) - except OSError as oserr: - log.warn("No broker daemon could be found") - log.warn(oserr) - log.warning("Spawning local broker-daemon...") - child = Process( - target=run, - args=(partial(_daemon_main, dhost), loglevel), - daemon=True, - name='pikerd', - ) - child.start() - trio.run(partial(launch_client, tries=5)) - child.join() + tractor.run(partial(launch_client, tries=1), name='kivy-watchlist') @cli.group() diff --git a/piker/ipc.py b/piker/ipc.py new file mode 100644 index 00000000..7bc647cf --- /dev/null +++ b/piker/ipc.py @@ -0,0 +1,191 @@ +""" +Inter-process comms abstractions +""" +from typing import Coroutine, Tuple + +import msgpack +import trio + +from .log import get_logger +log = get_logger('ipc') + + +class StreamQueue: + """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. + """ + def __init__(self, stream): + self.stream = stream + self._agen = self._iter_packets() + self._laddr = self.stream.socket.getsockname()[:2] + self._raddr = self.stream.socket.getpeername()[:2] + self._send_lock = trio.Lock() + + async def _iter_packets(self): + """Yield packets from the underlying stream. + """ + unpacker = msgpack.Unpacker(raw=False, use_list=False) + while True: + try: + data = await self.stream.receive_some(2**10) + log.trace(f"received {data}") + except trio.BrokenStreamError: + log.error(f"Stream connection {self.raddr} broke") + return + + if data == b'': + log.debug(f"Stream connection {self.raddr} was closed") + return + + unpacker.feed(data) + for packet in unpacker: + yield packet + + @property + def laddr(self): + return self._laddr + + @property + def raddr(self): + return self._raddr + + async def put(self, data): + async with self._send_lock: + return await self.stream.send_all( + msgpack.dumps(data, use_bin_type=True)) + + async def get(self): + return await self._agen.asend(None) + + async def __aiter__(self): + return self._agen + + def connected(self): + return self.stream.socket.fileno() != -1 + + +class Channel: + """A channel to actors in other processes. + + Use this to talk to any micro-service daemon or other client(s) over a + a transport managed by ``trio``. + """ + def __init__( + self, + destaddr: tuple = None, + on_reconnect: Coroutine = None, + auto_reconnect: bool = False, + stream: trio.SocketStream = None, # expected to be active + ) -> None: + self._recon_seq = on_reconnect + self._autorecon = auto_reconnect + self.squeue = StreamQueue(stream) if stream else None + if self.squeue and destaddr: + raise ValueError( + f"A stream was provided with local addr {self.laddr}" + ) + self._destaddr = destaddr or self.squeue.raddr + # set after handshake - always uid of far end + self.uid = None + + def __repr__(self): + if self.squeue: + return repr( + self.squeue.stream.socket._sock).replace( + "socket.socket", "Channel") + return object.__repr__(self) + + @property + def laddr(self): + return self.squeue.laddr if self.squeue else (None, None) + + @property + def raddr(self): + return self.squeue.raddr if self.squeue else (None, None) + + async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): + if self.connected(): + raise RuntimeError("channel is already connected?") + destaddr = destaddr or self._destaddr + stream = await trio.open_tcp_stream(*destaddr, **kwargs) + self.squeue = StreamQueue(stream) + return stream + + async def send(self, item): + log.trace(f"send `{item}`") + await self.squeue.put(item) + + async def recv(self): + try: + return await self.squeue.get() + except trio.BrokenStreamError: + if self._autorecon: + await self._reconnect() + return await self.recv() + + async def aclose(self, *args): + log.debug(f"Closing {self}") + await self.squeue.stream.aclose() + + async def __aenter__(self): + await self.connect() + return self + + async def __aexit__(self, *args): + await self.aclose(*args) + + async def __aiter__(self): + return self.aiter_recv() + + async def _reconnect(self): + """Handle connection failures by polling until a reconnect can be + established. + """ + down = False + while True: + try: + with trio.move_on_after(3) as cancel_scope: + await self.connect() + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn( + "Reconnect timed out after 3 seconds, retrying...") + continue + else: + log.warn("Stream connection re-established!") + # run any reconnection sequence + on_recon = self._recon_seq + if on_recon: + await on_recon(self) + break + except (OSError, ConnectionRefusedError): + if not down: + down = True + log.warn( + f"Connection to {self.raddr} went down, waiting" + " for re-establishment") + await trio.sleep(1) + + async def aiter_recv(self): + """Async iterate items from underlying stream. + """ + while True: + try: + async for item in self.squeue: + yield item + # sent = yield item + # if sent is not None: + # # optimization, passing None through all the + # # time is pointless + # await self.squeue.put(sent) + except trio.BrokenStreamError: + if not self._autorecon: + raise + await self.aclose() + if self._autorecon: # attempt reconnect + await self._reconnect() + continue + else: + return + + def connected(self): + return self.squeue.connected() if self.squeue else False diff --git a/piker/tractor.py b/piker/tractor.py new file mode 100644 index 00000000..cb7e121d --- /dev/null +++ b/piker/tractor.py @@ -0,0 +1,899 @@ +""" +tracor: An actor model micro-framework. +""" +from collections import defaultdict +from functools import partial +from typing import Coroutine +import importlib +import inspect +import multiprocessing as mp +import traceback +import uuid + +import trio +from async_generator import asynccontextmanager + +from .ipc import Channel +from .log import get_console_log, get_logger + +ctx = mp.get_context("forkserver") +log = get_logger('tractor') + +# set at startup and after forks +_current_actor = None +_default_arbiter_host = '127.0.0.1' +_default_arbiter_port = 1616 + + +class ActorFailure(Exception): + "General actor failure" + + +class RemoteActorError(ActorFailure): + "Remote actor exception bundled locally" + + +@asynccontextmanager +async def maybe_open_nursery(nursery=None): + """Create a new nursery if None provided. + + Blocks on exit as expected if no input nursery is provided. + """ + if nursery is not None: + yield nursery + else: + async with trio.open_nursery() as nursery: + yield nursery + + +async def _invoke( + cid, chan, func, kwargs, + treat_as_gen=False, raise_errs=False, + task_status=trio.TASK_STATUS_IGNORED +): + """Invoke local func and return results over provided channel. + """ + try: + is_async_partial = False + if isinstance(func, partial): + is_async_partial = inspect.iscoroutinefunction(func.func) + + if not inspect.iscoroutinefunction(func) and not is_async_partial: + await chan.send({'return': func(**kwargs), 'cid': cid}) + else: + coro = func(**kwargs) + + if inspect.isasyncgen(coro): + async for item in coro: + # TODO: can we send values back in here? + # it's gonna require a `while True:` and + # some non-blocking way to retrieve new `asend()` + # values from the channel: + # to_send = await chan.recv_nowait() + # if to_send is not None: + # to_yield = await coro.asend(to_send) + await chan.send({'yield': item, 'cid': cid}) + else: + if treat_as_gen: + # XXX: the async-func may spawn further tasks which push + # back values like an async-generator would but must + # manualy construct the response dict-packet-responses as + # above + await coro + else: + await chan.send({'return': await coro, 'cid': cid}) + + task_status.started() + except Exception: + if not raise_errs: + await chan.send({'error': traceback.format_exc(), 'cid': cid}) + else: + raise + + +async def result_from_q(q): + """Process a msg from a remote actor. + """ + first_msg = await q.get() + if 'return' in first_msg: + return 'return', first_msg, q + elif 'yield' in first_msg: + return 'yield', first_msg, q + elif 'error' in first_msg: + raise RemoteActorError(first_msg['error']) + else: + raise ValueError(f"{first_msg} is an invalid response packet?") + + +async def _do_handshake(actor, chan): + await chan.send(actor.uid) + uid = await chan.recv() + + if not isinstance(uid, tuple): + raise ValueError(f"{uid} is not a valid uid?!") + + chan.uid = uid + log.info(f"Handshake with actor {uid}@{chan.raddr} complete") + return uid + + +class Actor: + """The fundamental concurrency primitive. + + An *actor* is the combination of a regular Python or + ``multiprocessing.Process`` executing a ``trio`` task tree, communicating + with other actors through "portals" which provide a native async API + around "channels". + """ + is_arbiter = False + + def __init__( + self, + name: str, + main: Coroutine = None, + rpc_module_paths: [str] = [], + statespace: dict = {}, + uid: str = None, + allow_rpc: bool = True, + outlive_main: bool = False, + ): + self.name = name + self.uid = (name, uid or str(uuid.uuid1())) + self.rpc_module_paths = rpc_module_paths + self._mods = {} + self.main = main + # TODO: consider making this a dynamically defined + # @dataclass once we get py3.7 + self.statespace = statespace + self._allow_rpc = allow_rpc + self._outlive_main = outlive_main + + # filled in by `_async_main` after fork + self._peers = defaultdict(list) + self._no_more_peers = trio.Event() + self._no_more_peers.set() + self._actors2calls = {} # map {uids -> {callids -> waiter queues}} + self._listeners = [] + self._parent_chan = None + self._accept_host = None + + async def wait_for_peer(self, uid): + """Wait for a connection back from a spawned actor with a given + ``uid``. + """ + log.debug(f"Waiting for peer {uid} to connect") + event = self._peers.setdefault(uid, trio.Event()) + await event.wait() + log.debug(f"{uid} successfully connected back to us") + return event, self._peers[uid][-1] + + def load_namespaces(self): + # We load namespaces after fork since this actor may + # be spawned on a different machine from the original nursery + # and we need to try and load the local module code (if it + # exists) + for path in self.rpc_module_paths: + self._mods[path] = importlib.import_module(path) + + async def _stream_handler( + self, + stream: trio.SocketStream, + ): + """ + Entry point for new inbound connections to the channel server. + """ + self._no_more_peers.clear() + chan = Channel(stream=stream) + log.info(f"New connection to us {chan}") + + # send/receive initial handshake response + try: + uid = await _do_handshake(self, chan) + except StopAsyncIteration: + log.warn(f"Channel {chan} failed to handshake") + return + + # channel tracking + event_or_chans = self._peers.pop(uid, None) + if isinstance(event_or_chans, trio.Event): + # Instructing connection: this is likely a new channel to + # a recently spawned actor which we'd like to control via + # async-rpc calls. + log.debug(f"Waking channel waiters {event_or_chans.statistics()}") + # Alert any task waiting on this connection to come up + event_or_chans.set() + event_or_chans.clear() # consumer can wait on channel to close + elif isinstance(event_or_chans, list): + log.warn( + f"already have channel(s) for {uid}:{event_or_chans}?" + ) + # append new channel + self._peers[uid].extend(event_or_chans) + + log.debug(f"Registered {chan} for {uid}") + self._peers[uid].append(chan) + + # Begin channel management - respond to remote requests and + # process received reponses. + try: + await self._process_messages(chan) + finally: + # Drop ref to channel so it can be gc-ed and disconnected + if chan is not self._parent_chan: + log.debug(f"Releasing channel {chan}") + chans = self._peers.get(chan.uid) + chans.remove(chan) + if not chans: + log.debug(f"No more channels for {chan.uid}") + self._peers.pop(chan.uid, None) + if not self._peers: # no more channels connected + self._no_more_peers.set() + log.debug(f"No more peer channels") + + def _push_result(self, actorid, cid, msg): + assert actorid, f"`actorid` can't be {actorid}" + q = self.get_waitq(actorid, cid) + log.debug(f"Delivering {msg} from {actorid} to caller {cid}") + waiters = q.statistics().tasks_waiting_get + if not waiters: + log.warn( + f"No tasks are currently waiting for results from call {cid}?") + q.put_nowait(msg) + + def get_waitq(self, actorid, cid): + log.debug(f"Registering for callid {cid} queue results from {actorid}") + cids2qs = self._actors2calls.setdefault(actorid, {}) + return cids2qs.setdefault(cid, trio.Queue(1000)) + + async def send_cmd(self, chan, ns, func, kwargs): + """Send a ``'cmd'`` message to a remote actor and return a + caller id and a ``trio.Queue`` that can be used to wait for + responses delivered by the local message processing loop. + """ + cid = str(uuid.uuid1()) + q = self.get_waitq(chan.uid, cid) + log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") + await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) + return cid, q + + async def _process_messages(self, chan, treat_as_gen=False): + """Process messages async-RPC style. + + Process rpc requests and deliver retrieved responses from channels. + """ + # TODO: once https://github.com/python-trio/trio/issues/467 gets + # worked out we'll likely want to use that! + log.debug(f"Entering msg loop for {chan}") + async with trio.open_nursery() as nursery: + try: + async for msg in chan.aiter_recv(): + if msg is None: # terminate sentinel + log.debug(f"Cancelling all tasks for {chan}") + nursery.cancel_scope.cancel() + log.debug(f"Terminating msg loop for {chan}") + break + log.debug(f"Received msg {msg}") + cid = msg.get('cid') + if cid: # deliver response to local caller/waiter + self._push_result(chan.uid, cid, msg) + if 'error' in msg: + # TODO: need something better then this slop + raise RemoteActorError(msg['error']) + log.debug(f"Waiting on next msg for {chan}") + continue + else: + ns, funcname, kwargs, actorid, cid = msg['cmd'] + + log.debug( + f"Processing request from {actorid}\n" + f"{ns}.{funcname}({kwargs})") + if ns == 'self': + func = getattr(self, funcname) + else: + func = getattr(self._mods[ns], funcname) + + # spin up a task for the requested function + sig = inspect.signature(func) + treat_as_gen = False + if 'chan' in sig.parameters: + assert 'cid' in sig.parameters, \ + f"{func} must accept a `cid` (caller id) kwarg" + kwargs['chan'] = chan + kwargs['cid'] = cid + # TODO: eventually we want to be more stringent + # about what is considered a far-end async-generator. + # Right now both actual async gens and any async + # function which declares a `chan` kwarg in its + # signature will be treated as one. + treat_as_gen = True + + log.debug(f"Spawning task for {func}") + nursery.start_soon( + _invoke, cid, chan, func, kwargs, treat_as_gen, + name=funcname + ) + log.debug(f"Waiting on next msg for {chan}") + else: # channel disconnect + log.debug(f"{chan} disconnected") + except trio.ClosedStreamError: + log.error(f"{chan} broke") + + log.debug(f"Exiting msg loop for {chan}") + + def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): + # after fork routine which invokes a fresh ``trio.run`` + log.info( + f"Started new {ctx.current_process()} for actor {self.uid}") + global _current_actor + _current_actor = self + if loglevel: + get_console_log(loglevel) + log.debug(f"parent_addr is {parent_addr}") + try: + trio.run(partial( + self._async_main, accept_addr, parent_addr=parent_addr)) + except KeyboardInterrupt: + pass # handle it the same way trio does? + log.debug(f"Actor {self.uid} terminated") + + async def _async_main( + self, + accept_addr, + arbiter_addr=(_default_arbiter_host, _default_arbiter_port), + parent_addr=None, + nursery=None + ): + """Start the channel server and main task. + + A "root-most" (or "top-level") nursery for this actor is opened here + and when cancelled effectively cancels the actor. + """ + result = None + try: + async with maybe_open_nursery(nursery) as nursery: + self._root_nursery = nursery + + # Startup up channel server + host, port = accept_addr + await nursery.start(partial( + self._serve_forever, accept_host=host, accept_port=port) + ) + + if parent_addr is not None: + # Connect back to the parent actor and conduct initial + # handshake (From this point on if we error ship the + # exception back to the parent actor) + chan = self._parent_chan = Channel( + destaddr=parent_addr, + on_reconnect=self.main + ) + await chan.connect() + # initial handshake, report who we are, who they are + await _do_handshake(self, chan) + + # handle new connection back to parent optionally + # begin responding to RPC + if self._allow_rpc: + self.load_namespaces() + if self._parent_chan: + nursery.start_soon( + self._process_messages, self._parent_chan) + + # register with the arbiter if we're told its addr + log.debug(f"Registering {self} for role `{self.name}`") + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + + if self.main: + if self._parent_chan: + log.debug(f"Starting main task `{self.main}`") + # start "main" routine in a task + await nursery.start( + _invoke, 'main', self._parent_chan, self.main, {}, + False, True # treat_as_gen, raise_errs params + ) + else: + # run directly + log.debug(f"Running `{self.main}` directly") + result = await self.main() + + # terminate local in-proc once its main completes + log.debug( + f"Waiting for remaining peers {self._peers} to clear") + await self._no_more_peers.wait() + log.debug(f"All peer channels are complete") + + # tear down channel server + if not self._outlive_main: + log.debug(f"Shutting down channel server") + self.cancel_server() + + # blocks here as expected if no nursery was provided until + # the channel server is killed (i.e. this actor is + # cancelled or signalled by the parent actor) + except Exception: + if self._parent_chan: + log.exception("Actor errored:") + await self._parent_chan.send( + {'error': traceback.format_exc(), 'cid': 'main'}) + else: + raise + finally: + # UNregister actor from the arbiter + try: + if arbiter_addr is not None: + async with get_arbiter(*arbiter_addr) as arb_portal: + await arb_portal.run( + 'self', 'register_actor', + name=self.name, sockaddr=self.accept_addr) + except OSError: + log.warn(f"Unable to unregister {self.name} from arbiter") + + return result + + async def _serve_forever( + self, + *, + # (host, port) to bind for channel server + accept_host=None, + accept_port=0, + task_status=trio.TASK_STATUS_IGNORED + ): + """Main coroutine: connect back to the parent, spawn main task, begin + listening for new messages. + + """ + async with trio.open_nursery() as nursery: + self._server_nursery = nursery + # TODO: might want to consider having a separate nursery + # for the stream handler such that the server can be cancelled + # whilst leaving existing channels up + listeners = await nursery.start( + partial( + trio.serve_tcp, + self._stream_handler, + handler_nursery=self._root_nursery, + port=accept_port, host=accept_host, + ) + ) + log.debug( + f"Started tcp server(s) on {[l.socket for l in listeners]}") + self._listeners.extend(listeners) + task_status.started() + + def cancel(self): + """This cancels the internal root-most nursery thereby gracefully + cancelling (for all intents and purposes) this actor. + """ + self._root_nursery.cancel_scope.cancel() + + def cancel_server(self): + """Cancel the internal channel server nursery thereby + preventing any new inbound connections from being established. + """ + self._server_nursery.cancel_scope.cancel() + + @property + def accept_addr(self): + """Primary address to which the channel server is bound. + """ + try: + return self._listeners[0].socket.getsockname() + except OSError: + return + + def get_parent(self): + return Portal(self._parent_chan) + + def get_chans(self, actorid): + return self._peers[actorid] + + +class Arbiter(Actor): + """A special actor who knows all the other actors and always has + access to the top level nursery. + + The arbiter is by default the first actor spawned on each host + and is responsible for keeping track of all other actors for + coordination purposes. If a new main process is launched and an + arbiter is already running that arbiter will be used. + """ + _registry = defaultdict(list) + is_arbiter = True + + def find_actor(self, name): + return self._registry[name] + + def register_actor(self, name, sockaddr): + self._registry[name].append(sockaddr) + + def unregister_actor(self, name, sockaddr): + sockaddrs = self._registry.get(name) + if sockaddrs: + try: + sockaddrs.remove(sockaddr) + except ValueError: + pass + + +class Portal: + """A 'portal' to a(n) (remote) ``Actor``. + + Allows for invoking remote routines and receiving results through an + underlying ``tractor.Channel`` as though the remote (async) + function / generator was invoked locally. + + Think of this like an native async IPC API. + """ + def __init__(self, channel): + self.channel = channel + + async def aclose(self): + log.debug(f"Closing {self}") + # XXX: won't work until https://github.com/python-trio/trio/pull/460 + # gets in! + await self.channel.aclose() + + async def run(self, ns, func, **kwargs): + """Submit a function to be scheduled and run by actor, return its + (stream of) result(s). + """ + # TODO: not this needs some serious work and thinking about how + # to make async-generators the fundamental IPC API over channels! + # (think `yield from`, `gen.send()`, and functional reactive stuff) + chan = self.channel + # ship a function call request to the remote actor + actor = current_actor() + + cid, q = await actor.send_cmd(chan, ns, func, kwargs) + # wait on first response msg + resptype, first_msg, q = await result_from_q(q) + + if resptype == 'yield': + + async def yield_from_q(): + yield first_msg['yield'] + try: + async for msg in q: + try: + yield msg['yield'] + except KeyError: + raise RemoteActorError(msg['error']) + except GeneratorExit: + log.debug(f"Cancelling async gen call {cid} to {chan.uid}") + + return yield_from_q() + + elif resptype == 'return': + return first_msg['return'] + else: + raise ValueError(f"Unknown msg response type: {first_msg}") + + +@asynccontextmanager +async def open_portal(channel, nursery=None): + """Open a ``Portal`` through the provided ``channel``. + + Spawns a background task to handle rpc message processing. + """ + actor = current_actor() + assert actor + was_connected = False + + async with maybe_open_nursery(nursery) as nursery: + + if not channel.connected(): + await channel.connect() + was_connected = True + + if channel.uid is None: + await _do_handshake(actor, channel) + + if not actor.get_chans(channel.uid): + # actor is not currently managing this channel + actor._peers[channel.uid].append(channel) + + nursery.start_soon(actor._process_messages, channel) + yield Portal(channel) + + # cancel background msg loop task + nursery.cancel_scope.cancel() + if was_connected: + actor._peers[channel.uid].remove(channel) + await channel.aclose() + + +class LocalPortal: + """A 'portal' to a local ``Actor``. + + A compatibility shim for normal portals but for invoking functions + using an in process actor instance. + """ + def __init__(self, actor): + self.actor = actor + + async def run(self, ns, func, **kwargs): + """Run a requested function locally and return it's result. + """ + obj = self.actor if ns == 'self' else importlib.import_module(ns) + func = getattr(obj, func) + return func(**kwargs) + + +class ActorNursery: + """Spawn scoped subprocess actors. + """ + def __init__(self, actor, supervisor=None): + self.supervisor = supervisor + self._actor = actor + # We'll likely want some way to cancel all sub-actors eventually + # self.cancel_scope = cancel_scope + self._children = {} + + async def __aenter__(self): + return self + + async def start_actor( + self, + name: str, + main=None, + bind_addr=('127.0.0.1', 0), + statespace=None, + rpc_module_paths=None, + outlive_main=False, # sub-actors die when their main task completes + loglevel=None, # set console logging per subactor + ): + actor = Actor( + name, + # modules allowed to invoked funcs from + rpc_module_paths=rpc_module_paths, + statespace=statespace, # global proc state vars + main=main, # main coroutine to be invoked + outlive_main=outlive_main, + ) + parent_addr = self._actor.accept_addr + assert parent_addr + proc = ctx.Process( + target=actor._fork_main, + args=(bind_addr, parent_addr, loglevel), + daemon=True, + name=name, + ) + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await self._actor.wait_for_peer(actor.uid) + # channel is up, get queue which delivers result from main routine + main_q = self._actor.get_waitq(actor.uid, 'main') + self._children[(name, proc.pid)] = (actor, proc, main_q) + + return Portal(chan) + + async def wait(self): + + async def wait_for_proc(proc): + # TODO: timeout block here? + if proc.is_alive(): + await trio.hazmat.wait_readable(proc.sentinel) + # please god don't hang + proc.join() + log.debug(f"Joined {proc}") + + # unblocks when all waiter tasks have completed + async with trio.open_nursery() as nursery: + for subactor, proc, main_q in self._children.values(): + nursery.start_soon(wait_for_proc, proc) + + async def cancel(self, hard_kill=False): + log.debug(f"Cancelling nursery") + for subactor, proc, main_q in self._children.values(): + if proc is mp.current_process(): + # XXX: does this even make sense? + await subactor.cancel() + else: + if hard_kill: + log.warn(f"Hard killing subactors {self._children}") + proc.terminate() + # send KeyBoardInterrupt (trio abort signal) to underlying + # sub-actors + # os.kill(proc.pid, signal.SIGINT) + else: + # send cancel cmd - likely no response from subactor + actor = self._actor + chans = actor.get_chans(subactor.uid) + if chans: + for chan in chans: + await actor.send_cmd(chan, 'self', 'cancel', {}) + else: + log.warn( + f"Channel for {subactor.uid} is already down?") + log.debug(f"Waiting on all subactors to complete") + await self.wait() + log.debug(f"All subactors for {self} have terminated") + + async def __aexit__(self, etype, value, tb): + """Wait on all subactor's main routines to complete. + """ + async def wait_for_actor(actor, proc, q): + if proc.is_alive(): + ret_type, msg, q = await result_from_q(q) + log.info(f"{actor.uid} main task completed with {msg}") + if not actor._outlive_main: + # trigger msg loop to break + chans = self._actor.get_chans(actor.uid) + for chan in chans: + log.info(f"Signalling msg loop exit for {actor.uid}") + await chan.send(None) + + if etype is not None: + log.warn(f"{current_actor().uid} errored with {etype}, " + "cancelling actor nursery") + await self.cancel() + else: + log.debug(f"Waiting on subactors to complete") + async with trio.open_nursery() as nursery: + for subactor, proc, main_q in self._children.values(): + nursery.start_soon(wait_for_actor, subactor, proc, main_q) + + await self.wait() + log.debug(f"Nursery teardown complete") + + +def current_actor() -> Actor: + """Get the process-local actor instance. + """ + return _current_actor + + +@asynccontextmanager +async def open_nursery(supervisor=None, loglevel='WARNING'): + """Create and yield a new ``ActorNursery``. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + # TODO: figure out supervisors from erlang + async with ActorNursery(current_actor(), supervisor) as nursery: + yield nursery + + +class NoArbiterFound(Exception): + "Couldn't find the arbiter?" + + +async def start_actor(actor, host, port, arbiter_addr, nursery=None): + """Spawn a local actor by starting a task to execute it's main + async function. + + Blocks if no nursery is provided, in which case it is expected the nursery + provider is responsible for waiting on the task to complete. + """ + # assign process-local actor + global _current_actor + _current_actor = actor + + # start local channel-server and fake the portal API + # NOTE: this won't block since we provide the nursery + log.info(f"Starting local {actor} @ {host}:{port}") + + await actor._async_main( + accept_addr=(host, port), + parent_addr=None, + arbiter_addr=arbiter_addr, + nursery=nursery, + ) + # XXX: If spawned locally, the actor is cancelled when this + # context is complete given that there are no more active + # peer channels connected to it. + if not actor._outlive_main: + actor.cancel_server() + + # unset module state + _current_actor = None + log.info("Completed async main") + + +@asynccontextmanager +async def _connect_chan(host, port): + """Attempt to connect to an arbiter's channel server. + Return the channel on success or None on failure. + """ + chan = Channel((host, port)) + await chan.connect() + yield chan + await chan.aclose() + + +@asynccontextmanager +async def get_arbiter(host, port): + """Return a portal instance connected to a local or remote + arbiter. + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + if actor.is_arbiter: + # we're already the arbiter + # (likely a re-entrant call from the arbiter actor) + yield LocalPortal(actor) + else: + async with _connect_chan(host, port) as chan: + async with open_portal(chan) as arb_portal: + yield arb_portal + + +@asynccontextmanager +async def find_actor( + name, + arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) +): + """Ask the arbiter to find actor(s) by name. + + Returns a sequence of unconnected portals for each matching actor + known to the arbiter (client code is expected to connect the portals). + """ + actor = current_actor() + if not actor: + raise RuntimeError("No actor instance has been defined yet?") + + async with get_arbiter(*arbiter_sockaddr) as arb_portal: + sockaddrs = await arb_portal.run('self', 'find_actor', name=name) + # TODO: return portals to all available actors - for now just + # the first one we find + if sockaddrs: + sockaddr = sockaddrs[-1] + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + else: + yield + + +async def _main(async_fn, args, kwargs, name, arbiter_addr): + """Async entry point for ``tractor``. + """ + main = partial(async_fn, *args) if async_fn else None + arbiter_addr = (host, port) = arbiter_addr or ( + _default_arbiter_host, _default_arbiter_port) + # make a temporary connection to see if an arbiter exists + arbiter_found = False + try: + async with _connect_chan(host, port): + arbiter_found = True + except OSError: + log.warn(f"No actor could be found @ {host}:{port}") + + if arbiter_found: # we were able to connect to an arbiter + log.info(f"Arbiter seems to exist @ {host}:{port}") + # create a local actor and start up its main routine/task + actor = Actor( + name or 'anonymous', + main=main, + **kwargs + ) + host, port = (_default_arbiter_host, 0) + else: + # start this local actor as the arbiter + actor = Arbiter(name or 'arbiter', main=main, **kwargs) + + await start_actor(actor, host, port, arbiter_addr=arbiter_addr) + # Creates an internal nursery which shouldn't be cancelled even if + # the one opened below is (this is desirable because the arbiter should + # stay up until a re-election process has taken place - which is not + # implemented yet FYI). + + +def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): + """Run a trio-actor async function in process. + + This is tractor's main entry and the start point for any async actor. + """ + return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 07c720fd..21473c17 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -6,7 +6,7 @@ Launch with ``piker watch ``. (Currently there's a bunch of questrade specific stuff in here) """ from itertools import chain -from types import ModuleType +from types import ModuleType, AsyncGeneratorType import trio from kivy.uix.boxlayout import BoxLayout @@ -319,7 +319,7 @@ async def update_quotes( nursery: 'Nursery', brokermod: ModuleType, widgets: dict, - client: 'Client', + agen: AsyncGeneratorType, symbol_data: dict, first_quotes: dict ): @@ -359,7 +359,7 @@ async def update_quotes( grid.render_rows(cache) # core cell update loop - async for quotes in client.aiter_recv(): # new quotes data only + async for quotes in agen: # new quotes data only for symbol, quote in quotes.items(): record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) @@ -375,86 +375,96 @@ async def update_quotes( nursery.cancel_scope.cancel() -async def run_kivy(root, nursery): - '''Trio-kivy entry point. - ''' - await async_runTouchApp(root) # run kivy - nursery.cancel_scope.cancel() # cancel all other tasks that may be running - - -async def _async_main(name, client, tickers, brokermod, rate): +async def _async_main(name, portal, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. ''' - # subscribe for tickers - await client.send((brokermod.name, tickers)) - # get initial symbol data (long term data including last days close price) - # TODO: need something better this this toy protocol - sd = await client.recv() + # subscribe for tickers (this performs a possible filtering + # where invalid symbols are discarded) + sd = await portal.run( + "piker.brokers.core", 'symbol_data', + broker=brokermod.name, tickers=tickers) - async with trio.open_nursery() as nursery: - # get first quotes response - log.debug("Waiting on first quote...") - quotes = await client.recv() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + # an async generator instance + agen = await portal.run( + "piker.brokers.core", 'start_quote_stream', + broker=brokermod.name, tickers=tickers) - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + try: + async with trio.open_nursery() as nursery: + # get first quotes response + log.debug("Waiting on first quote...") + quotes = await agen.__anext__() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, client, sd, quotes) + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) + + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, agen, sd, quotes) + + # Trio-kivy entry point. + await async_runTouchApp(widgets['root']) # run kivy + await agen.aclose() # cancel aysnc gen call + finally: + # un-subscribe from symbols stream + await portal.run( + "piker.brokers.core", 'modify_quote_stream', + broker=brokermod.name, tickers=[]) + + # cancel GUI update task + nursery.cancel_scope.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py new file mode 100644 index 00000000..7200dfb0 --- /dev/null +++ b/tests/test_tractor.py @@ -0,0 +1,143 @@ +""" +Actor model API testing +""" +import time +from functools import partial + +import pytest +import trio +from piker import tractor + + +@pytest.fixture +def us_symbols(): + return ['TSLA', 'AAPL', 'CGC', 'CRON'] + + +@pytest.mark.trio +async def test_no_arbitter(): + """An arbitter must be established before any nurseries + can be created. + + (In other words ``tractor.run`` must be used instead of ``trio.run`` as is + done by the ``pytest-trio`` plugin.) + """ + with pytest.raises(RuntimeError): + with tractor.open_nursery(): + pass + + +def test_local_actor_async_func(): + """Verify a simple async function in-process. + """ + nums = [] + + async def print_loop(): + # arbiter is started in-proc if dne + assert tractor.current_actor().is_arbiter + + for i in range(10): + nums.append(i) + await trio.sleep(0.1) + + start = time.time() + tractor.run(print_loop) + + # ensure the sleeps were actually awaited + assert time.time() - start >= 1 + assert nums == list(range(10)) + + +# NOTE: this func must be defined at module level in order for the +# interal pickling infra of the forkserver to work +async def spawn(is_arbiter): + statespace = {'doggy': 10, 'kitty': 4} + namespaces = ['piker.brokers.core'] + + await trio.sleep(0.1) + actor = tractor.current_actor() + assert actor.is_arbiter == is_arbiter + + # arbiter should always have an empty statespace as it's redundant + assert actor.statespace == statespace + + if actor.is_arbiter: + async with tractor.open_nursery() as nursery: + # forks here + portal = await nursery.start_actor( + 'sub-actor', + main=partial(spawn, False), + statespace=statespace, + rpc_module_paths=namespaces, + ) + assert len(nursery._children) == 1 + assert portal.channel.uid in tractor.current_actor()._peers + else: + return 10 + + +def test_local_arbiter_subactor_global_state(): + statespace = {'doggy': 10, 'kitty': 4} + tractor.run( + spawn, + True, + name='arbiter', + statespace=statespace, + ) + + +async def rx_price_quotes_from_brokerd(us_symbols): + """Verify we can spawn a daemon actor and retrieve streamed price data. + """ + async with tractor.find_actor('brokerd') as portals: + if not portals: + # only one per host address, spawns an actor if None + async with tractor.open_nursery() as nursery: + # no brokerd actor found + portal = await nursery.start_actor( + 'brokerd', + rpc_module_paths=['piker.brokers.core'], + statespace={ + 'brokers2tickersubs': {}, + 'clients': {}, + 'dtasks': set() + }, + main=None, # don't start a main func - use rpc + ) + + # gotta expose in a broker agnostic way... + # retrieve initial symbol data + # sd = await portal.run( + # 'piker.brokers.core', 'symbol_data', symbols=us_symbols) + # assert list(sd.keys()) == us_symbols + + gen = await portal.run( + 'piker.brokers.core', + '_test_price_stream', + broker='robinhood', + symbols=us_symbols, + ) + # it'd sure be nice to have an asyncitertools here... + async for quotes in gen: + assert quotes + for key in quotes: + assert key in us_symbols + break + # terminate far-end async-gen + # await gen.asend(None) + # break + + # stop all spawned subactors + await nursery.cancel() + + # arbitter is cancelled here due to `find_actors()` internals + # (which internally uses `get_arbiter` which kills its channel + # server scope on exit) + + +def test_rx_price_quotes_from_brokerd(us_symbols): + tractor.run( + rx_price_quotes_from_brokerd, + us_symbols, + name='arbiter', + )