Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask cluster issues (scheduler?) #11

Open
ohiat opened this issue Jan 4, 2021 · 16 comments
Open

Dask cluster issues (scheduler?) #11

ohiat opened this issue Jan 4, 2021 · 16 comments

Comments

@ohiat
Copy link
Contributor

ohiat commented Jan 4, 2021

I've been working to scale our HLS notebooks out over larger datasets and larger clusters and have run into some issues:

  1. If a lot of tiles are submitted at once the scheduler seems to struggle to handle the number of tasks that are getting submitted.
  2. Long running jobs that submit a lot of tasks over time seem to eventually slow the scheduler down to a crawl even when number of tasks at a time is limited

Things I've investigated:

  1. Limiting number of tiles being processed at once. This works, but over time the task scheduling/running seems to slow down and eventually breaks.
  2. Increasing scheduler resources (more memory, more threads) - doesn't seem to solve it
  3. Investigated number of open file descriptors for the scheduler - doesn't seem to be reaching anywhere close to the max, could potentially be the problem though
  4. Decreasing worker count

More investigation is required...

@ohiat ohiat added bug Something isn't working infrastructure labels Jan 4, 2021
@ohiat ohiat self-assigned this Jan 4, 2021
@ohiat
Copy link
Contributor Author

ohiat commented Jan 13, 2021

OK I've replicated the issue of the cluster starting to slow down and have read more on worker memory management which seems to be the culprit.

I ran a long running job processing 5 HLS tiles concurrently for 190 tiles (WA 2015-2019). After 46 tiles were completed the job stalled when a worker stopped. I've attached the logs of the worker (port 44725) and scheduler.

logs_ tls___10.244.13.8_44725.txt
scheduler_logs.txt

Here are some screenshots of the cluster dashboards after it froze:
Screen Shot 2021-01-13 at 1 24 10 PM
Screen Shot 2021-01-13 at 1 24 24 PM
Screen Shot 2021-01-13 at 1 23 30 PM
Screen Shot 2021-01-13 at 1 23 51 PM
Screen Shot 2021-01-13 at 1 22 50 PM

@ohiat
Copy link
Contributor Author

ohiat commented Jan 13, 2021

From my (new) understanding of how memory management works Its likely the reason the workers start to slow down is that they start to consume more memory (2 potential reasons: memory leak, or each tile job gets larger as we progress through years as Sentinel satellites come online). They slow down because they start to spill memory to disk (if they can); however, there are frequent logs being written from every worker of the form:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 3.04 GB -- Worker memory limit: 4.29 GB

At some point the scheduler decides to kill the worker that ends up freezing the computation, but doesn't succeed to seem to do so:

distributed.scheduler - INFO - Remove worker <Worker 'tls://10.244.13.8:44725', name: dask-worker-4da5e804e1b94265a1903fcff5d6b150-76ktk, memory: 0, processing: 19>

distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing. Expected: <Worker 'tls://10.244.11.7:36307', name: dask-worker-4da5e804e1b94265a1903fcff5d6b150-zphfs, memory: 11, processing: 0>, Got: <Worker 'tls://10.244.13.8:44725', name: dask-worker-4da5e804e1b94265a1903fcff5d6b150-76ktk, memory: 22, processing: 24>, Key: fetch_band_url-21776f40628a270c9d4381dfb5837017

distributed.scheduler - INFO - Receive client connection: Client-worker-f89c938d-55e4-11eb-8013-b2deec2cc72b

In the worker we see:

distributed.worker - ERROR - 'fetch_band_url-524a75207a0f04dd3146eeef93a12878' Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2227, in update_who_has self.tasks[dep].who_has.update(workers) KeyError: 'fetch_band_url-524a75207a0f04dd3146eeef93a12878'

distributed.worker - ERROR - 'fetch_band_url-524a75207a0f04dd3146eeef93a12878' Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1880, in ensure_communicating to_gather, total_nbytes = self.select_keys_for_gather( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1985, in select_keys_for_gather total_bytes = self.tasks[dep].get_nbytes() KeyError: 'fetch_band_url-524a75207a0f04dd3146eeef93a12878'

distributed.worker - ERROR - 'fetch_band_url-524a75207a0f04dd3146eeef93a12878' Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 978, in handle_scheduler await self.handle_stream( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 598, in handle_stream func() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1880, in ensure_communicating to_gather, total_nbytes = self.select_keys_for_gather( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1985, in select_keys_for_gather total_bytes = self.tasks[dep].get_nbytes() KeyError: 'fetch_band_url-524a75207a0f04dd3146eeef93a12878'

distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

To fix this problem I think I need to:

  1. Increase worker memory
  2. Try to track down memory leaks
  3. See if I can figure out what our worker memory configs are & modify our worker memory configs.
  4. Understand why the scheduler couldn't successfully kill and restart this worker and keep on working away as one would hope.

@ohiat
Copy link
Contributor Author

ohiat commented Jan 13, 2021

I replicated this issue on this notebook

@ohiat
Copy link
Contributor Author

ohiat commented Jan 13, 2021

Confirmed that a worker over 80% memory pauses
distributed.worker - WARNING - Worker is at 88% memory usage. Pausing worker. Process memory: 3.79 GB -- Worker memory limit: 4.29 GB

@TomAugspurger
Copy link

If you're able to, capturing a performance report might help with debugging.

Option 1 does sound by far the easiest.

Fair warning: you're running into an issue that's been a headache for dask users for years. Lots of things look like a memory leak (see this search on the distributed issue tracker, dask/dask#3530, and others). Most / all of the time it isn't actually dask / distributed's fault, but it ends up exhausting worker memory. Dask workers do not behave well when they're close to their memory limit (dask/distributed#2602).

I'll try to take a look at the notebook later today or tomorrow to see if anything stands out.

@ohiat
Copy link
Contributor Author

ohiat commented Jan 15, 2021

Thanks for the tips @TomAugspurger. Here is a run today that eventually hung (despite me reducing the concurrency at which I process tiles from 5 to 2, and bumping memory per worker to 8gb)

I didn't observe serious pressure on the memory for any workers during this run. I'm not sure if the cause --> effect is the worker having issues --> scheduler tries to remove (and fails) or if its scheduler tries to remove worker --> worker fails (but isn't properly removed). Looking at the performance report the bandwidth matrix for workers looks strange for the failed worker (didn't communicate with many others), but maybe thats an artifact of when the matrix is generated? Also the fact num tasks is exactly 100k.

performance report
worker_logs_20200114.txt
scheduler_logs_20200114.txt

I will dig through dask issues to see if there is anything I can find there

@ohiat
Copy link
Contributor Author

ohiat commented Jan 21, 2021

@TomAugspurger Any ideas here especially around how to ensure a worker is removed when it gets disconnected from the scheduler properly? When the worker and scheduler get disconnected the worker is never removed and it just hangs and is marked red in the cluster's workers page. I'm a little lost here. Even with force restarting the cluster every N jobs, this still is occasionally happening so I assume its just a natural networking issue when a worker gets moved to a different node in AKS or something like that.

Here's the scheduler log:

distributed.scheduler - INFO - Remove worker <Worker 'tls://10.244.87.6:36309', name: dask-worker-4582d2f3eb2e4a70ad644ecd6723f596-w7lxg, memory: 0, processing: 5>

distributed.scheduler - INFO - Unexpected worker completed task, likely due to work stealing. Expected: <Worker 'tls://10.244.2.118:42947', name: dask-worker-4582d2f3eb2e4a70ad644ecd6723f596-7gkl7, memory: 21, processing: 0>, Got: <Worker 'tls://10.244.87.6:36309', name: dask-worker-4582d2f3eb2e4a70ad644ecd6723f596-w7lxg, memory: 7, processing: 6>, Key: ('broadcast_to-nanmedian-rechunk-merge-concatenate-1e38bbc91b0e017fbfcab78c75f06d49', 1, 3, 2)

and the corresponding worker log:

distributed.worker - ERROR - "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)" Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1880, in ensure_communicating to_gather, total_nbytes = self.select_keys_for_gather( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1985, in select_keys_for_gather total_bytes = self.tasks[dep].get_nbytes() KeyError: "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)"

distributed.worker - ERROR - "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)" Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2227, in update_who_has self.tasks[dep].who_has.update(workers) KeyError: "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)"

distributed.worker - ERROR - "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)" Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1880, in ensure_communicating to_gather, total_nbytes = self.select_keys_for_gather( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1985, in select_keys_for_gather total_bytes = self.tasks[dep].get_nbytes() KeyError: "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)"

distributed.worker - ERROR - "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)" Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 978, in handle_scheduler await self.handle_stream( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 598, in handle_stream func() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1880, in ensure_communicating to_gather, total_nbytes = self.select_keys_for_gather( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1985, in select_keys_for_gather total_bytes = self.tasks[dep].get_nbytes() KeyError: "('where-96cb7d216838bc986abb3df3360e5b6b', 139, 0, 0)"

distributed.worker - INFO - Connection to scheduler broken. Reconnecting...

distributed.worker - INFO - Comm closed

distributed.worker - INFO - Comm closed

distributed.worker - ERROR - Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1613, in transition_flight_memory self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/batched.py", line 136, in send raise CommClosedError distributed.comm.core.CommClosedError

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1613, in transition_flight_memory self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/batched.py", line 136, in send raise CommClosedError distributed.comm.core.CommClosedError

@ohiat
Copy link
Contributor Author

ohiat commented Jan 21, 2021

Next try: starting the workers with --no-reconnect so when it loses its connection to the scheduler it shuts down instead of trying to reconnect.

@TomAugspurger
Copy link

TomAugspurger commented Jan 22, 2021 via email

@ohiat
Copy link
Contributor Author

ohiat commented Jan 22, 2021

current branch, current notebook

The zarr needed for reproducing issues is zipped here (it's just a dataset of HLS tiles/scenes from Azure open datasets):
hls_west_2015-2019.zarr.zip

It appears using --no-reconnect when starting workers solves the issue I was running into, but that has either caused or exposed another issue with workers failing to communicate between themselves and eventually hanging.

Some example logs from workers that got stuck at one point:

distributed.worker - INFO - Start worker at: tls://10.244.34.131:44507

distributed.worker - INFO - Listening to: tls://10.244.34.131:44507

distributed.worker - INFO - dashboard at: 10.244.34.131:8787

distributed.worker - INFO - Waiting to connect to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 1

distributed.worker - INFO - Memory: 8.59 GB

distributed.worker - INFO - Local Directory: /home/jovyan/dask-worker-space/dask-worker-space/worker-j8kqt3id

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Starting Worker plugin /tmp/tmphhki7hgc/source.zipe9fe2554-da91-450d-b57f-5b8c711d380a

distributed.worker - INFO - Registered to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.119.29:43663 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.119.27:34409 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.119.28:33055 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.83.42:35203 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - INFO - Can't find dependencies for key ('nanmedian-rechunk-merge-broadcast_to-d7bf2b94a440e86f093d8a6cf2d6fcf8', 0, 0, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-24ade7bf47b032c37f9a606b31c38eaa', 11) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.119.27:34409 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.119.27:34409 after 10 s

distributed.worker - ERROR - "('rechunk-split-3b0f212184eaadfa158b6d51bcbaa0fa', 6)" Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2227, in update_who_has self.tasks[dep].who_has.update(workers) KeyError: "('rechunk-split-3b0f212184eaadfa158b6d51bcbaa0fa', 6)"

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-1adffda5fd3259f9122dea2f1a3cce03', 2, 0, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-42ac7482f5741a200042d6c941151697', 11) 0 . Asking scheduler
distributed.worker - INFO - Start worker at: tls://10.244.83.43:43483

distributed.worker - INFO - Listening to: tls://10.244.83.43:43483

distributed.worker - INFO - dashboard at: 10.244.83.43:8787

distributed.worker - INFO - Waiting to connect to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 1

distributed.worker - INFO - Memory: 8.59 GB

distributed.worker - INFO - Local Directory: /home/jovyan/dask-worker-space/dask-worker-space/worker-1eufsinh

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Starting Worker plugin /tmp/tmphhki7hgc/source.zipe9fe2554-da91-450d-b57f-5b8c711d380a

distributed.worker - INFO - Registered to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-183112f9b62d393d5182a6509d6d5d38', 1, 1, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-d5beaad9d93e8272884b8432d3f136c1', 26) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-4d7bdfa6c1f1b6294c8da9fa51d2e20e', 10, 1, 0)

distributed.worker - INFO - Dependent not found: ('rechunk-split-6a9e3b2313558b31532b371880f7c742', 23) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.131:44507 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.132:35449 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - INFO - Can't find dependencies for key ('nanmedian-rechunk-merge-broadcast_to-35c2429854572644e3f7ad0801917070', 0, 1, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-ac4250ef28cf6d0386f524aa54efb469', 38) 0 . Asking scheduler

distributed.worker - ERROR - failed during get data with tls://10.244.83.43:43483 -> tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1348, in get_data response = await comm.read(deserializers=serializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - failed during get data with tls://10.244.83.43:43483 -> tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1348, in get_data response = await comm.read(deserializers=serializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.130:46063 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-4d7bdfa6c1f1b6294c8da9fa51d2e20e', 10, 2, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-6a9e3b2313558b31532b371880f7c742', 51) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.129:36957 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-2791123329dde31314736fdb18d38fdf', 10, 0, 3)

distributed.worker - INFO - Dependent not found: ('rechunk-split-d5beaad9d93e8272884b8432d3f136c1', 26) 1 . Asking scheduler

distributed.worker - INFO - Dependent not found: ('rechunk-split-bf9acf326f74741d9689c773f1431c02', 18) 0 . Asking scheduler

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-183112f9b62d393d5182a6509d6d5d38', 1, 1, 1)

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.130:46063 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-4d7bdfa6c1f1b6294c8da9fa51d2e20e', 10, 0, 2)

distributed.worker - INFO - Dependent not found: ('rechunk-split-6a9e3b2313558b31532b371880f7c742', 16) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.132:35449 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.132:35449 after 10 s

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.129:36957 after 10 s
distributed.worker - INFO - Start worker at: tls://10.244.83.42:35203

distributed.worker - INFO - Listening to: tls://10.244.83.42:35203

distributed.worker - INFO - dashboard at: 10.244.83.42:8787

distributed.worker - INFO - Waiting to connect to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 1

distributed.worker - INFO - Memory: 8.59 GB

distributed.worker - INFO - Local Directory: /home/jovyan/dask-worker-space/dask-worker-space/worker-n1z6_8mr

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Starting Worker plugin /tmp/tmphhki7hgc/source.zipe9fe2554-da91-450d-b57f-5b8c711d380a

distributed.worker - INFO - Registered to: tls://dask-ef1cf406cfce473881344f2a34edf0bb.default:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.131:44507 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.132:35449 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - INFO - Can't find dependencies for key ('nanmedian-rechunk-merge-broadcast_to-0b63b7fcabad09c477fe8e5865e38c0d', 0, 1, 2)

distributed.worker - INFO - Dependent not found: ('rechunk-split-3481dac58337d358385aaa6ad1e341f9', 52) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-1045156bf8058c8bd52c6b0a8b778545', 1, 1, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-3b0f212184eaadfa158b6d51bcbaa0fa', 24) 0 . Asking scheduler

distributed.worker - INFO - Can't find dependencies for key ('where-getitem-c02de02d2e87740a8c06d670a62b3d95', 2, 0, 0)

distributed.worker - INFO - Dependent not found: ('concatenate-f070f6a65ce9a90bf2224f218e406749', 39, 0, 0) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) ConnectionResetError: [Errno 104] Connection reset by peer The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3249, in _get_data response = await send_recv( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 661, in send_recv response = await comm.read(deserializers=deserializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: ConnectionResetError: [Errno 104] Connection reset by peer

distributed.worker - ERROR - failed during get data with tls://10.244.83.42:35203 -> tls://10.244.34.130:46063 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1348, in get_data response = await comm.read(deserializers=serializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - failed during get data with tls://10.244.83.42:35203 -> tls://10.244.34.131:44507 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1348, in get_data response = await comm.read(deserializers=serializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - failed during get data with tls://10.244.83.42:35203 -> tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 867, in _read_to_buffer bytes_read = self.read_from_fd(buf) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/iostream.py", line 1592, in read_from_fd return self.socket.recv_into(buf, len(buf)) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1241, in recv_into return self.read(nbytes, buffer) File "/srv/conda/envs/notebook/lib/python3.8/ssl.py", line 1099, in read return self._sslobj.read(len, buffer) TimeoutError: [Errno 110] Connection timed out The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1348, in get_data response = await comm.read(deserializers=serializers) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 202, in read convert_stream_closed_error(self, e) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/tcp.py", line 122, in convert_stream_closed_error raise CommClosedError( distributed.comm.core.CommClosedError: in <closed TLS>: TimeoutError: [Errno 110] Connection timed out

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.129:36957 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-1045156bf8058c8bd52c6b0a8b778545', 1, 2, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-3b0f212184eaadfa158b6d51bcbaa0fa', 24) 1 . Asking scheduler

distributed.worker - INFO - Dependent not found: ('rechunk-split-3b0f212184eaadfa158b6d51bcbaa0fa', 42) 0 . Asking scheduler

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-1045156bf8058c8bd52c6b0a8b778545', 1, 1, 1)

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.129:36957 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('nanmedian-rechunk-merge-broadcast_to-43a6170260599ace30eef08cffe2cfbb', 0, 0, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-bf9acf326f74741d9689c773f1431c02', 11) 0 . Asking scheduler

distributed.worker - ERROR - Worker stream died during communication: tls://10.244.34.129:36957 Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 286, in connect comm = await asyncio.wait_for( File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 498, in wait_for raise exceptions.TimeoutError() asyncio.exceptions.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 2032, in gather_dep response = await get_data_from_worker( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 384, in retry_operation return await retry( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 1026, in connect comm = await connect( File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/comm/core.py", line 308, in connect raise IOError( OSError: Timed out trying to connect to tls://10.244.34.129:36957 after 10 s

distributed.worker - INFO - Can't find dependencies for key ('broadcast_to-nanmedian-rechunk-merge-concatenate-9b098bfbb720d3a9e0a157e6bb364a6a', 6, 2, 1)

distributed.worker - INFO - Dependent not found: ('rechunk-split-94c93b99aa4980a23f31fc1abfbb4d22', 64) 0 . Asking scheduler

@ohiat
Copy link
Contributor Author

ohiat commented Jan 22, 2021

daskhub helm values:

jupyterhub:
  singleuser:
    image:
      name: cspincregistry.azurecr.io/daskhub
      tag: latest
    memory:
      guarantee: 12G
  cull:
    enabled: false

dask-gateway:
  gateway:
    extraConfig:
      optionHandler: |
        from dask_gateway_server.options import Options, Integer, Float
        def options_handler(options):
            return {
                "worker_cores": options.worker_cores,
                "worker_memory": int(options.worker_memory * 2 ** 30),
                "scheduler_cores": options.scheduler_cores,
                "scheduler_memory": int(options.scheduler_memory * 2 ** 30),
            }

        c.Backend.cluster_options = Options(
            Integer("worker_cores", default=1, min=1, max=8, label="Worker Cores"),
            Float("worker_memory", default=2, min=1, max=16, label="Worker Memory (GiB)"),
            Integer("scheduler_cores", default=1, min=1, max=8, label="Scheduler Cores"),
            Float("scheduler_memory", default=2, min=1, max=16, label="Scheduler Memory (GiB)"),
            handler=options_handler,
        )
      workerNoReconnect: |
        c.ClusterConfig.worker_cmd = ["dask-worker", "--no-reconnect"]
      clusterTimeout: |
        c.ClusterConfig.idle_timeout = 600.0
    backend:
      image:
        name: cspincregistry.azurecr.io/daskhub
        tag: latest
        pullPolicy: IfNotPresent

The Pangeo ML notebook image can be used instead which our image is based off of

@TomAugspurger
Copy link

Great thanks. I'm able to at least start playing with the computation now.

A few meta-comments:

  1. It seems like a bunch of the code in hls.compute deals with extracting the
    relevant scene URLs. Hopefully our data query API will completely remove the
    need for most of that.

  2. Ideally you needn't worry about things like limiting the number of "jobs" /
    tasks you submit to the Dask scheduler. Under normal operating conditions,
    it should be smart enough to not assign workers too many tasks, such that
    they run out of memory. The hard part is in figuring out whether

    a.) The dask scheduler / worker is wrongly assinging tasks and exhausting
    worker memory
    b.) Whether some part of the user code (or any libraries it's calling into)
    is doing something "weird" that's messing up Dask's beliefs about how much
    memory is / should be being used. Hopefully my next section helps clear some
    of this up, but it's a bit fuzzy.

And a few Random thoughts:

  1. It's a bit strange to be passing a client around to processing functions like
    get_scene_dataset. Typically not needed unless you're doing something fancy
    like adding tasks from within tasks.

    It's helpful to distinguish operations on metadata from operations on
    data. As far as I can tell, everything in get_scene_dataset,
    fetch_band_url, ... is operating on metadata. It's not actually reading any
    (large) pieces of data. That I/O is lazy since you're using open_rasterio's
    chunks keyword.

    If the metadata operations themselves are slow and need to be parallelized,
    then it's perfectly fine to use dask.delayed or the Client's
    concurrent.futures interface. It seems like this is the case for your. But
    if you're using concurrent.futures I'd recommend a slight refactor to avoid

    def get_scene_datset(..., client) -> Future:
        futures = client.map(...)
        future = client.submit(xr.merge, futures)
        return future

    and replace it with

    def get_scene_dataset(...) -> xr.Dataset:
        return xr.merge(...)
    
    >>> dataset_future = client.submit(get_scene_dataset, ...)
    >>> dataset = dataset_future.wait()  # or gather / as_completed / etc.

    See below for motivation, but the tl/dr is it's probably easier to work with
    / debug xarray objects rather than passing around semi-opaque Future obejcts.

  2. You probably don't need things like client.submit(xr.merge, futures), if
    you combine with point 1. xarray.merge already knows how to operate lazily
    when the data is a Dask array. So you'd replace it with

    
    
  3. Delay execution as long as possible.

    https://docs.dask.org/en/latest/delayed-best-practices.html#compute-on-lots-of-computation-at-once
    documents this for dask.delayed, but I think it's generally useful. As much
    as possible, I'd recommend building up a large, lazy computation on one or
    more xarray Datasets and then calling dask.compute to kick things off. I
    think it reads a bit more clearly, since you can see the shape of the whole
    computation up front, rather than submitting bits and pieces at a time.

  4. What are your chunk shapes going into the compute_tile_median? I'm seeing (1, 3660, 3660), but I may have messed something up when scaling the problem down a bit. That's 27MB, which may be a bit small. It really does depend on the computation, but generally you want as large of chunks as possible for your computation. But this is just a performance optimization we can look at later.

Combining these points will, I think, make things easier to debug. You'll just
be using normal pandas / xarray methods to build up the computation, and then
handing things off to Dask to execute. My hope is that Dask will generally do
a better job with scheduling (avoiding memory issues) if it gets to see the
whole computation up front.

I haven't had a chance yet to look into the core of your computation
compute_tile_median. I know that in the past there's been some memory issues
with .groupby() indexing into large dask Arrays depending on the pattern of
the indices in the grouper. Will hopefully have a chance this afternoon!

@ohiat
Copy link
Contributor Author

ohiat commented Jan 22, 2021

Thanks for all the comments and insight on improving computations - we're new to dask and xarray so any pointers are helpful!

meta-comments:

  1. Yeah all of this code (including the cataloging code that I skipped by handing you the catalog zarr) I wrote up because there isn't a STAC or similar API I could use
  2. Yeah this is what I was operating under at first, but I found that there was significant memory pressure on the workers if I didn't limit parallel jobs. This might be due to my approach with futures you mentioned.

Random thoughts:

  1. Yeah the reason for doing this was because the metadata operations in fetch_band_url were slow.
  2. I'll see if this refactor works
  3. I think I got overzealous with futures will look into delayed.
  4. The chunking that gets passed in (1, 3660, 3660) is the size of each COG in the Azure open dataset (one band for one tile). The groupby/median operation also appears to be doing rechunking itself. The dataset we are computing monthly median on becomes ({scenes in a year}, 3660, 3660) + 8 data variables (1 for each band we are interested in) so in 2019 the size would be (~200, 3660, 3660), in 2015 this would be (~60, 3660, 3660).

@TomAugspurger
Copy link

The groupby/median operation also appears to be doing rechunking itself.

Do you know what the indices in the grouper are like? IIRC it's something like time.month, so it that [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, ... 11, 12]

I'll need to check the latest state of things, but IIRC, that kind of pattern is hard for Dask.Array to do efficiently. Your forced to either generate a bunch of tasks or move around a bunch of data. If we're able to identify this as a problem I'll take a closer look later.

Apologies for the lack of good examples here. We'll be contributing more and I'll see if anything comes up. Some of the ones in http://gallery.pangeo.io/ (particularly the landast gallery) may be worth looking at, but I don't know of any that directly relate to what you're trying to do.

@ohiat
Copy link
Contributor Author

ohiat commented Jan 22, 2021

Yeah its grouping on time.month so the indices are as you said. The galleries and other examples I've found are helpful for writing an initial computation (like the monthly median for a single tile for a year), but it was scaling that up and optimizing to many tiles and years (each tile and year is one "job" in my terminology) where I started with workarounds such as using futures for fetching metadata and have ended up out on a limb

@ohiat
Copy link
Contributor Author

ohiat commented Jan 22, 2021

I refactored so that each job (aka invocation of calculate_job_median) doesn't use futures and uses dask.delayed to parallelize fetching metadata and then proceeds with normal xarray functions. In turn then each job is submitted to the cluster by process_catalog. Is there a way I could change how I use futures/client.submit to submit jobs so I don't have to manually handle the number of jobs being processed concurrently?

Each job is a totally independent computation and thus embarrassingly parallel in that sense...So what I am looking to do is ideally submit all the jobs that need to be done to the dask cluster and then the dask cluster know to only do so many at once so as not to overwhelm the total available memory in the cluster. Right now if I jump to 8 concurrent jobs from 2 the cluster will struggle significantly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants