From 2b6dfe414a4c592200ccfcc6426708d4a781a9d4 Mon Sep 17 00:00:00 2001 From: Eric Ernst Date: Tue, 9 Nov 2021 10:56:39 -0800 Subject: [PATCH 1/3] watchers: don't dereference symlinks when copying files The current implementation just copies the file, dereferencing any simlinks in the process. This results in symlinks no being preserved, and a change in layout relative to the mount that we are making watchable. What we want is something like "cp -d" This isn't available in a crate, so let's go ahead and introduce a copy function which will create a symlink with same relative path if the source file is a symlink. Regular files are handled with the standard fs::copy. Introduce a unit test to verify symlinks are now handled appropriately. Fixes: #2950 Signed-off-by: Eric Ernst --- src/agent/src/watcher.rs | 109 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 101 insertions(+), 8 deletions(-) diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index b111aa166..187d85ccc 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -79,6 +79,16 @@ impl Drop for Storage { } } +async fn copy(from: impl AsRef, to: impl AsRef) -> Result<()> { + // if source is a symlink, just create new symlink with same link source + if fs::symlink_metadata(&from).await?.file_type().is_symlink() { + fs::symlink(fs::read_link(&from).await?, to).await?; + } else { + fs::copy(from, to).await?; + } + Ok(()) +} + impl Storage { async fn new(storage: protos::Storage) -> Result { let entry = Storage { @@ -110,19 +120,13 @@ impl Storage { dest_file_path }; - debug!( - logger, - "Copy from {} to {}", - source_file_path.display(), - dest_file_path.display() - ); - fs::copy(&source_file_path, &dest_file_path) + copy(&source_file_path, &dest_file_path) .await .with_context(|| { format!( "Copy from {} to {} failed", source_file_path.display(), - dest_file_path.display() + dest_file_path.display(), ) })?; @@ -843,6 +847,95 @@ mod tests { } } + #[tokio::test] + async fn test_copy() { + // prepare tmp src/destination + let source_dir = tempfile::tempdir().unwrap(); + let dest_dir = tempfile::tempdir().unwrap(); + + // verify copy of a regular file + let src_file = source_dir.path().join("file.txt"); + let dst_file = dest_dir.path().join("file.txt"); + fs::write(&src_file, "foo").unwrap(); + copy(&src_file, &dst_file).await.unwrap(); + // verify destination: + assert!(!fs::symlink_metadata(dst_file) + .unwrap() + .file_type() + .is_symlink()); + + // verify copy of a symlink + let src_symlink_file = source_dir.path().join("symlink_file.txt"); + let dst_symlink_file = dest_dir.path().join("symlink_file.txt"); + tokio::fs::symlink(&src_file, &src_symlink_file) + .await + .unwrap(); + copy(src_symlink_file, &dst_symlink_file).await.unwrap(); + // verify destination: + assert!(fs::symlink_metadata(&dst_symlink_file) + .unwrap() + .file_type() + .is_symlink()); + assert_eq!(fs::read_link(&dst_symlink_file).unwrap(), src_file); + assert_eq!(fs::read_to_string(&dst_symlink_file).unwrap(), "foo") + } + + #[tokio::test] + async fn watch_directory_with_symlinks() { + // Prepare source directory: + // ./tmp/.data/file.txt + // ./tmp/1.txt -> ./tmp/.data/file.txt + let source_dir = tempfile::tempdir().unwrap(); + fs::create_dir_all(source_dir.path().join(".data")).unwrap(); + fs::write(source_dir.path().join(".data/file.txt"), "two").unwrap(); + tokio::fs::symlink( + source_dir.path().join(".data/file.txt"), + source_dir.path().join("1.txt"), + ) + .await + .unwrap(); + + let dest_dir = tempfile::tempdir().unwrap(); + + let mut entry = Storage::new(protos::Storage { + source: source_dir.path().display().to_string(), + mount_point: dest_dir.path().display().to_string(), + ..Default::default() + }) + .await + .unwrap(); + + let logger = slog::Logger::root(slog::Discard, o!()); + + assert_eq!(entry.scan(&logger).await.unwrap(), 2); + + // Should copy no files since nothing is changed since last check + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Should copy 1 file + thread::sleep(Duration::from_secs(1)); + fs::write(source_dir.path().join(".data/file.txt"), "updated").unwrap(); + assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!( + fs::read_to_string(dest_dir.path().join(".data/file.txt")).unwrap(), + "updated" + ); + assert_eq!( + fs::read_to_string(dest_dir.path().join("1.txt")).unwrap(), + "updated" + ); + + // Verify that resulting 1.txt is a symlink: + assert!(tokio::fs::symlink_metadata(dest_dir.path().join("1.txt")) + .await + .unwrap() + .file_type() + .is_symlink()); + + // Should copy no new files after copy happened + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + } + #[tokio::test] async fn watch_directory() { // Prepare source directory: From 296e76f8eea4c070642b496e134f8c10f6872b4c Mon Sep 17 00:00:00 2001 From: Eric Ernst Date: Mon, 15 Nov 2021 14:46:09 -0800 Subject: [PATCH 2/3] watchers: handle symlinked directories, dir removal - Even a directory could be a symlink - check for this. This is very common when using configmaps/secrets - Add unit test to better mimic a configmap, configmap update - We would never remove directories before. Let's ensure that these are added to the watched_list, and verify in unit tests - Update unit tests which exercise maximum number of files per entry. There's a change in behavior now that we consider directories/symlinks watchable as well. For these tests, it means we support one less file in a watchable mount. Signed-off-by: Eric Ernst --- src/agent/src/watcher.rs | 206 ++++++++++++++++++++++++++++++--------- 1 file changed, 159 insertions(+), 47 deletions(-) diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index 187d85ccc..fd0f9fe86 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -49,7 +49,7 @@ struct Storage { /// the source becomes too large, either in number of files (>16) or total size (>1MB). watch: bool, - /// The list of files to watch from the source mount point and updated in the target one. + /// The list of files, directories, symlinks to watch from the source mount point and updated in the target one. watched_files: HashMap, } @@ -80,9 +80,13 @@ impl Drop for Storage { } async fn copy(from: impl AsRef, to: impl AsRef) -> Result<()> { - // if source is a symlink, just create new symlink with same link source if fs::symlink_metadata(&from).await?.file_type().is_symlink() { - fs::symlink(fs::read_link(&from).await?, to).await?; + // if source is a symlink, create new symlink with same link source. If + // the symlink exists, remove and create new one: + if fs::symlink_metadata(&to).await.is_ok() { + fs::remove_file(&to).await?; + } + fs::symlink(fs::read_link(&from).await?, &to).await?; } else { fs::copy(from, to).await?; } @@ -103,6 +107,16 @@ impl Storage { async fn update_target(&self, logger: &Logger, source_path: impl AsRef) -> Result<()> { let source_file_path = source_path.as_ref(); + // if we are creating a directory: just create it, nothing more to do + if source_file_path.symlink_metadata()?.file_type().is_dir() { + fs::create_dir_all(source_file_path) + .await + .with_context(|| { + format!("Unable to mkdir all for {}", source_file_path.display()) + })? + } + + // Assume we are dealing with either a file or a symlink now: let dest_file_path = if self.source_mount_point.is_file() { // Simple file to file copy // Assume target mount is a file path @@ -139,7 +153,7 @@ impl Storage { let mut remove_list = Vec::new(); let mut updated_files: Vec = Vec::new(); - // Remove deleted files for tracking list + // Remove deleted files for tracking list. self.watched_files.retain(|st, _| { if st.exists() { true @@ -151,10 +165,19 @@ impl Storage { // Delete from target for path in remove_list { - // File has been deleted, remove it from target mount let target = self.make_target_path(path)?; - debug!(logger, "Removing file from mount: {}", target.display()); - let _ = fs::remove_file(target).await; + // The target may be a directory or a file. If it is a directory that is removed, + // we'll remove all files under that directory as well. Because of this, there's a + // chance the target (a subdirectory or file under a prior removed target) was already + // removed. Make sure we check if the target exists before checking the metadata, and + // don't return an error if the remove fails + if target.exists() && target.symlink_metadata()?.file_type().is_dir() { + debug!(logger, "Removing a directory: {}", target.display()); + let _ = fs::remove_dir_all(target).await; + } else { + debug!(logger, "Removing a file: {}", target.display()); + let _ = fs::remove_file(target).await; + } } // Scan new & changed files @@ -186,15 +209,16 @@ impl Storage { let mut size: u64 = 0; debug!(logger, "Scanning path: {}", path.display()); - if path.is_file() { - let metadata = path - .metadata() - .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; + let metadata = path + .symlink_metadata() + .with_context(|| format!("Failed to query metadata for: {}", path.display()))?; - let modified = metadata - .modified() - .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + let modified = metadata + .modified() + .with_context(|| format!("Failed to get modified date for: {}", path.display()))?; + // Treat files and symlinks the same: + if path.is_file() || metadata.file_type().is_symlink() { size += metadata.len(); // Insert will return old entry if any @@ -216,6 +240,16 @@ impl Storage { } ); } else { + // Handling regular directories - check to see if this directory is already being tracked, and + // track if not: + if self + .watched_files + .insert(path.to_path_buf(), modified) + .is_none() + { + update_list.push(path.to_path_buf()); + } + // Scan dir recursively let mut entries = fs::read_dir(path) .await @@ -616,7 +650,7 @@ mod tests { .unwrap(); // setup storage3: many files, but still watchable - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(src3_path.join(format!("{}.txt", i)), "original").unwrap(); } @@ -678,7 +712,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + MAX_ENTRIES_PER_STORAGE - 1 ); // Add two files to storage 0, verify it is updated without needing to run check: @@ -748,7 +782,7 @@ mod tests { std::fs::read_dir(entries.0[3].target_mount_point.as_path()) .unwrap() .count(), - MAX_ENTRIES_PER_STORAGE + 1 + MAX_ENTRIES_PER_STORAGE ); // verify that we can remove files as well, but that it isn't observed until check is run @@ -826,15 +860,20 @@ mod tests { fs::remove_file(source_dir.path().join("big.txt")).unwrap(); fs::remove_file(source_dir.path().join("too-big.txt")).unwrap(); - // Up to 16 files should be okay: - for i in 1..MAX_ENTRIES_PER_STORAGE + 1 { + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Up to 15 files should be okay (can watch 15 files + 1 directory) + for i in 1..MAX_ENTRIES_PER_STORAGE { fs::write(source_dir.path().join(format!("{}.txt", i)), "original").unwrap(); } - assert_eq!(entry.scan(&logger).await.unwrap(), MAX_ENTRIES_PER_STORAGE); + assert_eq!( + entry.scan(&logger).await.unwrap(), + MAX_ENTRIES_PER_STORAGE - 1 + ); - // 17 files is too many: - fs::write(source_dir.path().join("17.txt"), "updated").unwrap(); + // 16 files wll be too many: + fs::write(source_dir.path().join("16.txt"), "updated").unwrap(); thread::sleep(Duration::from_secs(1)); // Expect to receive a MountTooManyFiles error @@ -881,19 +920,67 @@ mod tests { } #[tokio::test] - async fn watch_directory_with_symlinks() { - // Prepare source directory: - // ./tmp/.data/file.txt - // ./tmp/1.txt -> ./tmp/.data/file.txt + async fn watch_directory_verify_dir_removal() { let source_dir = tempfile::tempdir().unwrap(); - fs::create_dir_all(source_dir.path().join(".data")).unwrap(); - fs::write(source_dir.path().join(".data/file.txt"), "two").unwrap(); - tokio::fs::symlink( - source_dir.path().join(".data/file.txt"), - source_dir.path().join("1.txt"), - ) + let dest_dir = tempfile::tempdir().unwrap(); + + let mut entry = Storage::new(protos::Storage { + source: source_dir.path().display().to_string(), + mount_point: dest_dir.path().display().to_string(), + ..Default::default() + }) .await .unwrap(); + let logger = slog::Logger::root(slog::Discard, o!()); + + // create a path we'll remove later + fs::create_dir_all(source_dir.path().join("tmp")).unwrap(); + fs::write(&source_dir.path().join("tmp/test-file"), "foo").unwrap(); + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // root, ./tmp, test-file + + // Verify expected directory, file: + assert_eq!( + std::fs::read_dir(dest_dir.path().join("tmp")) + .unwrap() + .count(), + 1 + ); + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 1); + + // Now, remove directory, and verify that the directory (and its file) are removed: + fs::remove_dir_all(source_dir.path().join("tmp")).unwrap(); + thread::sleep(Duration::from_secs(1)); + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + assert_eq!(std::fs::read_dir(&dest_dir).unwrap().count(), 0); + + assert_eq!(entry.scan(&logger).await.unwrap(), 0); + } + + #[tokio::test] + async fn watch_directory_with_symlinks() { + // Prepare source directory: + // ..2021_10_29_03_10_48.161654083/file.txt + // ..data -> ..2021_10_29_03_10_48.161654083 + // file.txt -> ..data/file.txt + + let source_dir = tempfile::tempdir().unwrap(); + let actual_dir = source_dir.path().join("..2021_10_29_03_10_48.161654083"); + let actual_file = actual_dir.join("file.txt"); + let sym_dir = source_dir.path().join("..data"); + let sym_file = source_dir.path().join("file.txt"); + + // create backing file/path + fs::create_dir_all(&actual_dir).unwrap(); + fs::write(&actual_file, "two").unwrap(); + + // create indirection symlink directory tha points to actual_dir: + tokio::fs::symlink(&actual_dir, &sym_dir).await.unwrap(); + + // create presented data file symlink: + tokio::fs::symlink(sym_dir.join("file.txt"), sym_file) + .await + .unwrap(); let dest_dir = tempfile::tempdir().unwrap(); @@ -907,26 +994,31 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 5); // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); - // Should copy 1 file + // now what, what is updated? + fs::write(actual_file, "updated").unwrap(); thread::sleep(Duration::from_secs(1)); - fs::write(source_dir.path().join(".data/file.txt"), "updated").unwrap(); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!( - fs::read_to_string(dest_dir.path().join(".data/file.txt")).unwrap(), - "updated" - ); - assert_eq!( - fs::read_to_string(dest_dir.path().join("1.txt")).unwrap(), + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), "updated" ); - // Verify that resulting 1.txt is a symlink: - assert!(tokio::fs::symlink_metadata(dest_dir.path().join("1.txt")) + // Verify that resulting file.txt is a symlink: + assert!( + tokio::fs::symlink_metadata(dest_dir.path().join("file.txt")) + .await + .unwrap() + .file_type() + .is_symlink() + ); + + // Verify that .data directory is a symlink: + assert!(tokio::fs::symlink_metadata(&dest_dir.path().join("..data")) .await .unwrap() .file_type() @@ -934,6 +1026,25 @@ mod tests { // Should copy no new files after copy happened assert_eq!(entry.scan(&logger).await.unwrap(), 0); + + // Now, simulate configmap update. + // - create a new actual dir/file, + // - update the symlink directory to point to this one + // - remove old dir/file + let new_actual_dir = source_dir.path().join("..2021_10_31_03_10_48.161654083"); + let new_actual_file = new_actual_dir.join("file.txt"); + fs::create_dir_all(&new_actual_dir).unwrap(); + fs::write(&new_actual_file, "new configmap").unwrap(); + + tokio::fs::remove_file(&sym_dir).await.unwrap(); + tokio::fs::symlink(&new_actual_dir, &sym_dir).await.unwrap(); + tokio::fs::remove_dir_all(&actual_dir).await.unwrap(); + + assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink + assert_eq!( + fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), + "new configmap" + ); } #[tokio::test] @@ -958,7 +1069,7 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.scan(&logger).await.unwrap(), 5); // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); @@ -1028,8 +1139,9 @@ mod tests { let logger = slog::Logger::root(slog::Discard, o!()); - assert_eq!(entry.scan(&logger).await.unwrap(), 1); - assert_eq!(entry.watched_files.len(), 1); + // expect the root directory and the file: + assert_eq!(entry.scan(&logger).await.unwrap(), 2); + assert_eq!(entry.watched_files.len(), 2); assert!(target_file.exists()); assert!(entry.watched_files.contains_key(&source_file)); @@ -1039,7 +1151,7 @@ mod tests { assert_eq!(entry.scan(&logger).await.unwrap(), 0); - assert_eq!(entry.watched_files.len(), 0); + assert_eq!(entry.watched_files.len(), 1); assert!(!target_file.exists()); } From b5dfcf26538d6262a016698550f11ed15b742090 Mon Sep 17 00:00:00 2001 From: Eric Ernst Date: Thu, 18 Nov 2021 13:49:54 -0800 Subject: [PATCH 3/3] watcher: tests: ensure there is 20ms delay between fs writes We noticed s390x test failures on several of the watcher unit tests. Discovered that on s390 in particular, if we update a file in quick sucecssion, the time stampe on the file would not be unique between the writes. Through testing, we observe that a 20 millisecond delay is very reliable for being able to observe the timestamp update. Let's ensure we have this delay between writes for our tests so our tests are more reliable. In "the real world" we'll be polling for changes every 2 seconds, and frequency of filesystem updates will be on order of minutes and days, rather that microseconds. Fixes: #2946 Signed-off-by: Eric Ernst --- src/agent/src/watcher.rs | 45 ++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/src/agent/src/watcher.rs b/src/agent/src/watcher.rs index fd0f9fe86..bb3fb1641 100644 --- a/src/agent/src/watcher.rs +++ b/src/agent/src/watcher.rs @@ -660,6 +660,9 @@ mod tests { ..Default::default() }; + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + entries .add(std::iter::once(storage0), &logger) .await @@ -730,6 +733,9 @@ mod tests { "updated" ); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + // // Prepare for second check: update mount sources // @@ -970,20 +976,27 @@ mod tests { let sym_dir = source_dir.path().join("..data"); let sym_file = source_dir.path().join("file.txt"); + let relative_to_dir = PathBuf::from("..2021_10_29_03_10_48.161654083"); + // create backing file/path fs::create_dir_all(&actual_dir).unwrap(); fs::write(&actual_file, "two").unwrap(); - // create indirection symlink directory tha points to actual_dir: - tokio::fs::symlink(&actual_dir, &sym_dir).await.unwrap(); + // create indirection symlink directory that points to the directory that holds the actual file: + tokio::fs::symlink(&relative_to_dir, &sym_dir) + .await + .unwrap(); // create presented data file symlink: - tokio::fs::symlink(sym_dir.join("file.txt"), sym_file) + tokio::fs::symlink(PathBuf::from("..data/file.txt"), sym_file) .await .unwrap(); let dest_dir = tempfile::tempdir().unwrap(); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + let mut entry = Storage::new(protos::Storage { source: source_dir.path().display().to_string(), mount_point: dest_dir.path().display().to_string(), @@ -1001,8 +1014,12 @@ mod tests { // now what, what is updated? fs::write(actual_file, "updated").unwrap(); - thread::sleep(Duration::from_secs(1)); + + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + assert_eq!(entry.scan(&logger).await.unwrap(), 1); + assert_eq!( fs::read_to_string(dest_dir.path().join("file.txt")).unwrap(), "updated" @@ -1031,13 +1048,15 @@ mod tests { // - create a new actual dir/file, // - update the symlink directory to point to this one // - remove old dir/file - let new_actual_dir = source_dir.path().join("..2021_10_31_03_10_48.161654083"); + let new_actual_dir = source_dir.path().join("..2021_10_31"); let new_actual_file = new_actual_dir.join("file.txt"); fs::create_dir_all(&new_actual_dir).unwrap(); fs::write(&new_actual_file, "new configmap").unwrap(); tokio::fs::remove_file(&sym_dir).await.unwrap(); - tokio::fs::symlink(&new_actual_dir, &sym_dir).await.unwrap(); + tokio::fs::symlink(PathBuf::from("..2021_10_31"), &sym_dir) + .await + .unwrap(); tokio::fs::remove_dir_all(&actual_dir).await.unwrap(); assert_eq!(entry.scan(&logger).await.unwrap(), 3); // file, file-dir, symlink @@ -1057,6 +1076,9 @@ mod tests { fs::create_dir_all(source_dir.path().join("A/B")).unwrap(); fs::write(source_dir.path().join("A/B/1.txt"), "two").unwrap(); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + let dest_dir = tempfile::tempdir().unwrap(); let mut entry = Storage::new(protos::Storage { @@ -1074,8 +1096,6 @@ mod tests { // Should copy no files since nothing is changed since last check assert_eq!(entry.scan(&logger).await.unwrap(), 0); - // Should copy 1 file - thread::sleep(Duration::from_secs(1)); fs::write(source_dir.path().join("A/B/1.txt"), "updated").unwrap(); assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!( @@ -1083,6 +1103,9 @@ mod tests { "updated" ); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + // Should copy no new files after copy happened assert_eq!(entry.scan(&logger).await.unwrap(), 0); @@ -1113,7 +1136,9 @@ mod tests { assert_eq!(entry.scan(&logger).await.unwrap(), 1); - thread::sleep(Duration::from_secs(1)); + // delay 20 ms between writes to files in order to ensure filesystem timestamps are unique + thread::sleep(Duration::from_millis(20)); + fs::write(&source_file, "two").unwrap(); assert_eq!(entry.scan(&logger).await.unwrap(), 1); assert_eq!(fs::read_to_string(&dest_file).unwrap(), "two"); @@ -1197,6 +1222,8 @@ mod tests { watcher.mount(&logger).await.unwrap(); assert!(is_mounted(WATCH_MOUNT_POINT_PATH).unwrap()); + thread::sleep(Duration::from_millis(20)); + watcher.cleanup(); assert!(!is_mounted(WATCH_MOUNT_POINT_PATH).unwrap()); }